提交 52550468 编写于 作者: Y Yiming Liu

Merge pull request #135 from nobodyiam/config-server-update-push-merge

Server side config update push
package com.ctrip.apollo.adminservice;
import com.ctrip.apollo.biz.message.DummyMessageSender;
import com.ctrip.apollo.biz.message.MessageSender;
import com.ctrip.apollo.biz.message.RedisMessageSender;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Configuration
public class AdminServiceAutoConfiguration {
@ConditionalOnProperty(value = "apollo.redis.enabled", havingValue = "true", matchIfMissing = false)
public static class AdminRedisConfiguration {
@Value("${apollo.redis.host}")
private String host;
@Value("${apollo.redis.port}")
private int port;
@Bean
public JedisConnectionFactory redisConnectionFactory() {
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName(host);
factory.setPort(port);
return factory;
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
return template;
}
@Bean
public MessageSender redisMessageSender(RedisTemplate<String, String> redisTemplate) {
return new RedisMessageSender(redisTemplate);
}
}
@Configuration
@ConditionalOnProperty(value = "apollo.redis.enabled", havingValue = "false", matchIfMissing = true)
public static class ConfigDefaultConfiguration {
@Bean
public MessageSender defaultMessageSender() {
return new DummyMessageSender();
}
}
}
......@@ -12,6 +12,8 @@ import org.springframework.web.bind.annotation.RestController;
import com.ctrip.apollo.biz.entity.Namespace;
import com.ctrip.apollo.biz.entity.Release;
import com.ctrip.apollo.biz.message.MessageSender;
import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.biz.service.ConfigService;
import com.ctrip.apollo.biz.service.NamespaceService;
import com.ctrip.apollo.biz.service.ReleaseService;
......@@ -32,6 +34,9 @@ public class ReleaseController {
@Autowired
private NamespaceService namespaceService;
@Autowired
private MessageSender messageSender;
@RequestMapping("/release/{releaseId}")
public ReleaseDTO get(@PathVariable("releaseId") long releaseId) {
Release release = releaseService.findOne(releaseId);
......@@ -73,6 +78,12 @@ public class ReleaseController {
clusterName, namespaceName));
}
Release release = releaseService.buildRelease(name, comment, namespace, user.getUsername());
messageSender.sendMessage(assembleKey(appId, clusterName, namespaceName),
Topics.APOLLO_RELEASE_TOPIC);
return BeanUtils.transfrom(ReleaseDTO.class, release);
}
private String assembleKey(String appId, String cluster, String namespace) {
return String.format("%s-%s-%s", appId, cluster, namespace);
}
}
......@@ -9,12 +9,19 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.test.context.jdbc.Sql;
import org.springframework.test.context.jdbc.Sql.ExecutionPhase;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import com.ctrip.apollo.biz.entity.Namespace;
import com.ctrip.apollo.biz.message.MessageSender;
import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.biz.repository.ReleaseRepository;
import com.ctrip.apollo.biz.service.NamespaceService;
import com.ctrip.apollo.biz.service.ReleaseService;
import com.ctrip.apollo.core.dto.AppDTO;
import com.ctrip.apollo.core.dto.ClusterDTO;
import com.ctrip.apollo.core.dto.ItemDTO;
......@@ -22,6 +29,11 @@ import com.ctrip.apollo.core.dto.NamespaceDTO;
import com.ctrip.apollo.core.dto.ReleaseDTO;
import com.google.gson.Gson;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ReleaseControllerTest extends AbstractControllerTest {
@Autowired
......@@ -78,4 +90,37 @@ public class ReleaseControllerTest extends AbstractControllerTest {
Gson gson = new Gson();
Assert.assertEquals(gson.toJson(configurations), release.getConfigurations());
}
@Test
public void testMessageSendAfterBuildRelease() throws Exception {
String someAppId = "someAppId";
String someNamespaceName = "someNamespace";
String someCluster = "someCluster";
String someName = "someName";
String someComment = "someComment";
String someUserName = "someUser";
NamespaceService someNamespaceService = mock(NamespaceService.class);
ReleaseService someReleaseService = mock(ReleaseService.class);
MessageSender someMessageSender = mock(MessageSender.class);
Namespace someNamespace = mock(Namespace.class);
UserDetails someUser = mock(UserDetails.class);
ReleaseController releaseController = new ReleaseController();
ReflectionTestUtils.setField(releaseController, "releaseService", someReleaseService);
ReflectionTestUtils.setField(releaseController, "namespaceService", someNamespaceService);
ReflectionTestUtils.setField(releaseController, "messageSender", someMessageSender);
when(someNamespaceService.findOne(someAppId, someCluster, someNamespaceName))
.thenReturn(someNamespace);
when(someUser.getUsername()).thenReturn(someUserName);
releaseController
.buildRelease(someAppId, someCluster, someNamespaceName, someName, someComment, someUser);
verify(someMessageSender, times(1))
.sendMessage(String.format("%s-%s-%s", someAppId, someCluster, someNamespaceName),
Topics.APOLLO_RELEASE_TOPIC);
}
}
......@@ -22,6 +22,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
......
package com.ctrip.apollo.biz.message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class DummyMessageSender implements MessageSender{
private static final Logger logger = LoggerFactory.getLogger(DummyMessageSender.class);
@Override
public void sendMessage(String message, String channel) {
logger.warn("No message sender available! message: {}, channel: {}", message, channel);
}
}
package com.ctrip.apollo.biz.message;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public interface MessageListener {
void handleMessage(String message, String channel);
}
package com.ctrip.apollo.biz.message;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public interface MessageSender {
void sendMessage(String message, String channel);
}
package com.ctrip.apollo.biz.message;
import org.springframework.data.redis.core.RedisTemplate;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class RedisMessageSender implements MessageSender {
private RedisTemplate<String, String> redisTemplate;
public RedisMessageSender(
RedisTemplate<String, String> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public void sendMessage(String message, String channel) {
try {
redisTemplate.convertAndSend(channel, message);
} catch (Throwable ex) {
} finally {
}
}
}
package com.ctrip.apollo.biz.message;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class Topics {
public static final String APOLLO_RELEASE_TOPIC = "apollo-release";
}
package com.ctrip.apollo.biz.service;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.ctrip.apollo.biz.entity.Release;
import com.ctrip.apollo.biz.repository.ReleaseRepository;
import com.ctrip.apollo.core.dto.ApolloConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.lang.reflect.Type;
import java.util.Map;
/**
* Config Service
*
......@@ -24,30 +17,9 @@ public class ConfigService {
@Autowired
private ReleaseRepository releaseRepository;
private Gson gson = new Gson();
private Type configurationTypeReference = new TypeToken<Map<String, String>>() {}.getType();
public Release findRelease(String appId, String clusterName, String namespaceName) {
Release release = releaseRepository.findFirstByAppIdAndClusterNameAndNamespaceNameOrderByIdDesc(
appId, clusterName, namespaceName);
return release;
}
/**
* Load configuration from database
*/
public ApolloConfig loadConfig(Release release) {
if (release == null) {
return null;
}
ApolloConfig config = new ApolloConfig(release.getAppId(), release.getClusterName(),
release.getNamespaceName(), String.valueOf(release.getId()));
config.setConfigurations(transformConfigurationToMap(release.getConfigurations()));
return config;
}
Map<String, String> transformConfigurationToMap(String configurations) {
return gson.fromJson(configurations, configurationTypeReference);
}
}
package com.ctrip.apollo.biz.message;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.data.redis.core.RedisTemplate;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class RedisMessageSenderTest {
@Mock
private RedisTemplate<String, String> redisTemplate;
private RedisMessageSender redisMessageSender;
@Before
public void setUp() throws Exception {
redisMessageSender = new RedisMessageSender(redisTemplate);
}
@Test
public void testSendMessage() throws Exception {
String someMessage = "someMessage";
String someChannel = "someChannel";
redisMessageSender.sendMessage(someMessage, someChannel);
verify(redisTemplate, times(1)).convertAndSend(someChannel, someMessage);
}
@Test
public void testSendMessageWithError() throws Exception {
String someMessage = "someMessage";
String someChannel = "someChannel";
doThrow(new RuntimeException()).when(redisTemplate).convertAndSend(someChannel, someMessage);
redisMessageSender.sendMessage(someMessage, someChannel);
}
}
......@@ -39,25 +39,28 @@ public class ConfigServiceTest {
}
@Test
public void testLoadConfig() throws Exception {
public void testFindRelease() throws Exception {
String someAppId = "1";
String someClusterName = "someClusterName";
String someGroupName = "someGroupName";
String someNamespaceName = "someNamespaceName";
String someReleaseId = "1";
String someValidConfiguration = "{\"apollo.bar\": \"foo\"}";
Release someRelease = assembleRelease(someReleaseId, someAppId, someClusterName, someGroupName,
Release someRelease = assembleRelease(someReleaseId, someAppId, someClusterName, someNamespaceName,
someValidConfiguration);
when(releaseRepository.findFirstByAppIdAndClusterNameAndNamespaceNameOrderByIdDesc(someAppId,
someClusterName, someGroupName)).thenReturn(someRelease);
someClusterName, someNamespaceName)).thenReturn(someRelease);
ApolloConfig result = configService.loadConfig(someRelease);
Release result = configService.findRelease(someAppId, someClusterName, someNamespaceName);
verify(releaseRepository, times(1))
.findFirstByAppIdAndClusterNameAndNamespaceNameOrderByIdDesc(someAppId, someClusterName,
someNamespaceName);
assertEquals(someAppId, result.getAppId());
assertEquals(someClusterName, result.getCluster());
assertEquals(someReleaseId, result.getReleaseId());
assertEquals("foo", result.getConfigurations().get("apollo.bar"));
assertEquals(someClusterName, result.getClusterName());
assertEquals(someReleaseId, String.valueOf(result.getId()));
assertEquals(someValidConfiguration, result.getConfigurations());
}
@Test
......@@ -68,8 +71,8 @@ public class ConfigServiceTest {
when(releaseRepository.findFirstByAppIdAndClusterNameAndNamespaceNameOrderByIdDesc(someAppId,
someClusterName, someNamespaceName)).thenReturn(null);
Release someRelease = configService.findRelease(someAppId, someClusterName, someNamespaceName);
ApolloConfig result = configService.loadConfig(someRelease);
Release result = configService.findRelease(someAppId, someClusterName, someNamespaceName);
assertNull(result);
verify(releaseRepository, times(1)).findFirstByAppIdAndClusterNameAndNamespaceNameOrderByIdDesc(
......@@ -87,26 +90,4 @@ public class ConfigServiceTest {
return release;
}
@Test
public void testTransformConfigurationToMapSuccessful() throws Exception {
String someValidConfiguration = "{\"apollo.bar\": \"foo\"}";
Map<String, String> someMap = Maps.newHashMap();
someMap.put("apollo.bar", "foo");
Map<String, String> result = configService.transformConfigurationToMap(someValidConfiguration);
assertEquals(someMap, result);
}
@Test(expected = JsonSyntaxException.class)
public void testTransformConfigurationToMapFailed() throws Exception {
String someInvalidConfiguration = "xxx";
Map<String, String> result =
configService.transformConfigurationToMap(someInvalidConfiguration);
assertTrue(result.isEmpty());
}
}
......@@ -9,7 +9,6 @@ import com.ctrip.apollo.ConfigChangeListener;
import com.ctrip.apollo.enums.PropertyChangeType;
import com.ctrip.apollo.model.ConfigChange;
import com.ctrip.apollo.model.ConfigChangeEvent;
import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat;
import org.slf4j.Logger;
......@@ -39,8 +38,7 @@ public abstract class AbstractConfig implements Config {
listener.onChange(changeEvent);
} catch (Throwable ex) {
Cat.logError(ex);
logger.error("Failed to invoke config change listener {}", listener.getClass(), ExceptionUtil
.getDetailMessage(ex));
logger.error("Failed to invoke config change listener {}", listener.getClass(), ex);
}
}
}
......
......@@ -49,9 +49,7 @@ public abstract class AbstractConfigRepository implements ConfigRepository {
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Cat.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(),
ExceptionUtil
.getDetailMessage(ex));
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
......
......@@ -264,7 +264,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}",
logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}, reason: {}",
appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
lastServiceDto = null;
Cat.logError(ex);
......
package com.ctrip.apollo.util;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.util.List;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class ExceptionUtil {
public static String getDetailMessage(Throwable ex) {
if (ex == null) {
if (ex == null || Strings.isNullOrEmpty(ex.getMessage())) {
return "";
}
if (ex.getCause() != null) {
return String.format("%s [Cause: %s]", ex.getMessage(), getDetailMessage(ex.getCause()));
StringBuilder builder = new StringBuilder(ex.getMessage());
List<Throwable> causes = Lists.newLinkedList();
int counter = 0;
Throwable current = ex;
//retrieve up to 10 causes
while (current.getCause() != null && counter < 10) {
Throwable next = current.getCause();
causes.add(next);
current = next;
counter++;
}
return ex.getMessage();
for (Throwable cause : causes) {
if (Strings.isNullOrEmpty(cause.getMessage())) {
counter--;
continue;
}
builder.append(" [Cause: ").append(cause.getMessage());
}
builder.append(Strings.repeat("]", counter));
return builder.toString();
}
}
......@@ -10,6 +10,7 @@ import com.ctrip.apollo.internals.SimpleConfigTest;
import com.ctrip.apollo.spi.DefaultConfigFactoryManagerTest;
import com.ctrip.apollo.spi.DefaultConfigFactoryTest;
import com.ctrip.apollo.spi.DefaultConfigRegistryTest;
import com.ctrip.apollo.util.ExceptionUtilTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......@@ -20,7 +21,7 @@ import org.junit.runners.Suite.SuiteClasses;
ConfigServiceTest.class, DefaultConfigRegistryTest.class, DefaultConfigFactoryManagerTest.class,
DefaultConfigManagerTest.class, DefaultConfigTest.class, LocalFileConfigRepositoryTest.class,
RemoteConfigRepositoryTest.class, SimpleConfigTest.class, DefaultConfigFactoryTest.class,
ConfigIntegrationTest.class
ConfigIntegrationTest.class, ExceptionUtilTest.class
})
public class AllTests {
......
......@@ -3,6 +3,7 @@ package com.ctrip.apollo.integration;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.ctrip.apollo.Config;
import com.ctrip.apollo.ConfigChangeListener;
......@@ -194,6 +195,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
Config config = ConfigService.getConfig();
final List<ConfigChangeEvent> changeEvents = Lists.newArrayList();
final SettableFuture<Boolean> refreshFinished = SettableFuture.create();
config.addChangeListener(new ConfigChangeListener() {
AtomicInteger counter = new AtomicInteger(0);
......@@ -208,12 +210,13 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
assertEquals(anotherValue, changeEvent.getChange(someKey).getNewValue());
// if there is any assertion failed above, this line won't be executed
changeEvents.add(changeEvent);
refreshFinished.set(true);
}
});
apolloConfig.getConfigurations().put(someKey, anotherValue);
someRefreshTimeUnit.sleep(someRefreshInterval * 2);
refreshFinished.get(someRefreshInterval * 5, someRefreshTimeUnit);
assertThat(
"Change event's size should equal to one or there must be some assertion failed in change listener",
......@@ -242,9 +245,18 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
Config config = ConfigService.getConfig();
assertEquals(someValue, config.getProperty(someKey, null));
final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
longPollFinished.set(true);
}
});
apolloConfig.getConfigurations().put(someKey, anotherValue);
TimeUnit.MILLISECONDS.sleep(pollTimeoutInMS * 3);
longPollFinished.get(pollTimeoutInMS * 10, TimeUnit.MILLISECONDS);
assertEquals(anotherValue, config.getProperty(someKey, null));
......
package com.ctrip.apollo.util;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class ExceptionUtilTest {
@Test
public void testGetDetailMessageWithNoCause() throws Exception {
String someMessage = "some message";
Throwable ex = new Throwable(someMessage);
assertEquals(someMessage, ExceptionUtil.getDetailMessage(ex));
}
@Test
public void testGetDetailMessageWithCauses() throws Exception {
String causeMsg1 = "some cause";
String causeMsg2 = "another cause";
String someMessage = "some message";
Throwable cause2 = new Throwable(causeMsg2);
Throwable cause1 = new Throwable(causeMsg1, cause2);
Throwable ex = new Throwable(someMessage, cause1);
String expected = someMessage + " [Cause: " + causeMsg1 + " [Cause: " + causeMsg2 + "]]";
assertEquals(expected, ExceptionUtil.getDetailMessage(ex));
}
@Test
public void testGetDetailMessageWithCauseMessageNull() throws Exception {
String someMessage = "some message";
Throwable cause = new Throwable();
Throwable ex = new Throwable(someMessage, cause);
assertEquals(someMessage, ExceptionUtil.getDetailMessage(ex));
}
@Test
public void testGetDetailMessageWithNullMessage() throws Exception {
Throwable ex = new Throwable();
assertEquals("", ExceptionUtil.getDetailMessage(ex));
assertEquals("", ExceptionUtil.getDetailMessage(null));
}
}
package com.ctrip.apollo.configservice;
import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.configservice.controller.NotificationController;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Configuration
public class ConfigServiceAutoConfiguration {
@ConditionalOnProperty(value = "apollo.redis.enabled", havingValue = "true", matchIfMissing = false)
public static class ConfigRedisConfiguration {
@Value("${apollo.redis.host}")
private String host;
@Value("${apollo.redis.port}")
private int port;
@Bean
public JedisConnectionFactory redisConnectionFactory() {
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName(host);
factory.setPort(port);
return factory;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
return container;
}
@Bean
public ChannelTopic apolloReleaseTopic() {
return new ChannelTopic(Topics.APOLLO_RELEASE_TOPIC);
}
@Bean
public MessageListenerAdapter apolloMessageListener(RedisMessageListenerContainer container,
NotificationController notification,
ChannelTopic topic) {
MessageListenerAdapter adapter = new MessageListenerAdapter(notification);
container.addMessageListener(adapter, topic);
return adapter;
}
}
}
package com.ctrip.apollo.configservice.controller;
import com.google.common.base.Joiner;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.ctrip.apollo.biz.entity.Release;
import com.ctrip.apollo.biz.service.ConfigService;
import com.ctrip.apollo.core.dto.ApolloConfig;
......@@ -12,6 +19,10 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.servlet.http.HttpServletResponse;
......@@ -24,44 +35,78 @@ public class ConfigController {
@Autowired
private ConfigService configService;
private Gson gson = new Gson();
private Type configurationTypeReference =
new TypeToken<Map<java.lang.String, java.lang.String>>() {
}.getType();
@RequestMapping(value = "/{appId}/{clusterName}", method = RequestMethod.GET)
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
@RequestParam(value = "datacenter", required = false) String datacenter,
@RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId,
HttpServletResponse response) throws IOException {
//default namespace is appId
return this.queryConfig(appId, clusterName, appId, clientSideReleaseId, response);
return this.queryConfig(appId, clusterName, appId, datacenter, clientSideReleaseId, response);
}
@RequestMapping(value = "/{appId}/{clusterName}/{namespace}", method = RequestMethod.GET)
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
@PathVariable String namespace,
@RequestParam(value = "datacenter", required = false) String datacenter,
@RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId,
HttpServletResponse response) throws IOException {
Release release = configService.findRelease(appId, clusterName, namespace);
//TODO if namespace != application, should also query config by namespace and DC(default if DC not found)?
//And if found, should merge config, as well as releaseId -> make releaseId a string?
if (release == null) {
List<Release> releases = Lists.newLinkedList();
Release currentAppRelease = configService.findRelease(appId, clusterName, namespace);
if (currentAppRelease != null) {
releases.add(currentAppRelease);
}
//if namespace is not appId itself, should check if it has its own configurations
if (!Objects.equals(appId, namespace)) {
//TODO find id for this particular namespace, if not equal to current app id, then do more
if (!Objects.isNull(datacenter)) {
//TODO load newAppId+datacenter+namespace configurations
}
//TODO if load from DC failed, then load newAppId+defaultCluster+namespace configurations
}
if (releases.isEmpty()) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
String.format(
"Could not load version with appId: %s, clusterName: %s, namespace: %s",
"Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
appId, clusterName, namespace));
return null;
}
if (String.valueOf(release.getId()).equals(clientSideReleaseId)) {
String mergedReleaseId = FluentIterable.from(releases).transform(
input -> String.valueOf(input.getId())).join(Joiner.on("|"));
if (mergedReleaseId.equals(clientSideReleaseId)) {
// Client side configuration is the same with server side, return 304
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
return null;
}
ApolloConfig apolloConfig = configService.loadConfig(release);
if (apolloConfig == null) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
String.format("Could not load config with releaseId: %d, clusterName: %s",
release.getId(), clusterName));
return null;
}
ApolloConfig apolloConfig = new ApolloConfig(appId, clusterName, namespace, mergedReleaseId);
apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));
return apolloConfig;
}
/**
* Merge configurations of releases.
* Release in lower index override those in higher index
* @param releases
* @return
*/
Map<String, String> mergeReleaseConfigurations(List<Release> releases) {
Map<String, String> result = Maps.newHashMap();
for (Release release : Lists.reverse(releases)) {
result.putAll(gson.fromJson(release.getConfigurations(), configurationTypeReference));
}
return result;
}
}
package com.ctrip.apollo.configservice.controller;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.ctrip.apollo.biz.message.MessageListener;
import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.apollo.core.utils.ApolloThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -15,11 +18,9 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
......@@ -28,17 +29,11 @@ import javax.servlet.http.HttpServletResponse;
*/
@RestController
@RequestMapping("/notifications")
public class NotificationController {
public class NotificationController implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(NotificationController.class);
private final static long TIMEOUT = 120 * 60 * 1000;//120 MINUTES
private final static long TIMEOUT = 360 * 60 * 1000;//6 hours
private final Multimap<String, DeferredResult<ApolloConfigNotification>> deferredResults =
Multimaps.synchronizedSetMultimap(HashMultimap.create());
private final Multimap<DeferredResult<ApolloConfigNotification>, String> deferredResultReversed =
Multimaps.synchronizedSetMultimap(HashMultimap.create());
{
startRandomChange();
}
@RequestMapping(method = RequestMethod.GET)
public DeferredResult<ApolloConfigNotification> pollNotification(
......@@ -53,41 +48,64 @@ public class NotificationController {
namespace = appId;
}
List<String> watchedKeys = Lists.newArrayList(assembleKey(appId, cluster, namespace));
//Listen more namespaces, since it's not the default namespace
if (!Objects.equals(appId, namespace)) {
//TODO find id for this particular namespace, if not equal to current app id, then do more
if (!Objects.isNull(datacenter)) {
//TODO add newAppId+datacenter+namespace to listened keys
}
//TODO add newAppId+defaultCluster+namespace to listened keys
}
DeferredResult<ApolloConfigNotification> deferredResult =
new DeferredResult<>(TIMEOUT);
String key = assembleKey(appId, cluster, namespace);
this.deferredResults.put(key, deferredResult);
//to record all the keys related to deferredResult
this.deferredResultReversed.put(deferredResult, key);
final String finalNamespace = namespace;
//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResult);
}
deferredResult.onCompletion(() -> {
logger.info("deferred result for {} {} {} completed", appId, cluster, finalNamespace);
deferredResults.remove(key, deferredResult);
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResult);
}
});
deferredResult.onTimeout(() -> {
logger.info("deferred result for {} {} {} timeout", appId, cluster, finalNamespace);
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
});
logger.info("deferred result for {} {} {} returned", appId, cluster, namespace);
return deferredResult;
}
private void startRandomChange() {
Random random = new Random();
ScheduledExecutorService testService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("NotificationController", true));
testService.scheduleAtFixedRate((Runnable) () -> deferredResults
.entries().stream().filter(entry -> random.nextBoolean()).forEach(entry -> {
String[] keys = entry.getKey().split("-");
entry.getValue().setResult(new ApolloConfigNotification(keys[0], keys[1], keys[2]));
}), 30, 30, TimeUnit.SECONDS);
}
private String assembleKey(String appId, String cluster, String namespace) {
return String.format("%s-%s-%s", appId, cluster, namespace);
}
@Override
public void handleMessage(String message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message)) {
return;
}
String[] keys = message.split("-");
//message should be appId-cluster-namespace
if (keys.length != 3) {
logger.error("message format invalid - {}", message);
return;
}
ApolloConfigNotification notification = new ApolloConfigNotification(keys[0], keys[1], keys[2]);
Collection<DeferredResult<ApolloConfigNotification>> results = deferredResults.get(message);
logger.info("Notify {} clients for key {}", results.size(), message);
for (DeferredResult<ApolloConfigNotification> result : results) {
result.setResult(notification);
}
}
}
package com.ctrip.apollo.configservice;
import com.ctrip.apollo.configservice.controller.ConfigControllerTest;
import com.ctrip.apollo.configservice.controller.NotificationControllerTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
@RunWith(Suite.class)
@SuiteClasses({ConfigControllerTest.class})
@SuiteClasses({ConfigControllerTest.class, NotificationControllerTest.class})
public class AllTests {
}
package com.ctrip.apollo.configservice.controller;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.ctrip.apollo.biz.entity.Release;
import com.ctrip.apollo.biz.service.ConfigService;
import com.ctrip.apollo.core.dto.ApolloConfig;
......@@ -11,15 +17,17 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.any;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
......@@ -29,100 +37,119 @@ import static org.mockito.Mockito.when;
*/
@RunWith(MockitoJUnitRunner.class)
public class ConfigControllerTest {
private ConfigController configController;
@Mock
private ConfigService configService;
private String someAppId;
private String someClusterName;
private String someNamespaceName;
private String someDataCenter;
private String someValidConfiguration;
@Mock
private Release someRelease;
@Before
public void setUp() throws Exception {
configController = new ConfigController();
ReflectionTestUtils.setField(configController, "configService", configService);
someAppId = "1";
someClusterName = "someClusterName";
someNamespaceName = "someNamespaceName";
someDataCenter = "someDC";
someValidConfiguration = "{\"apollo.bar\": \"foo\"}";
when(someRelease.getConfigurations()).thenReturn(someValidConfiguration);
}
@Test
public void testQueryConfig() throws Exception {
ApolloConfig someApolloConfig = mock(ApolloConfig.class);
String someAppId = "1";
String someClusterName = "someClusterName";
String someNamespaceName = "someNamespaceName";
long someClientSideReleaseId = 1;
long someServerSideNewReleaseId = 2;
HttpServletResponse someResponse = mock(HttpServletResponse.class);
Release someRelease = mock(Release.class);
when(configService.findRelease(someAppId, someClusterName, someNamespaceName))
.thenReturn(someRelease);
when(someRelease.getId()).thenReturn(someServerSideNewReleaseId);
when(configService.loadConfig(someRelease)).thenReturn(someApolloConfig);
ApolloConfig result = configController.queryConfig(someAppId, someClusterName,
someNamespaceName, String.valueOf(someClientSideReleaseId), someResponse);
someNamespaceName, someDataCenter, String.valueOf(someClientSideReleaseId), someResponse);
assertEquals(someApolloConfig, result);
verify(configService, times(1)).findRelease(someAppId, someClusterName, someNamespaceName);
verify(configService, times(1)).loadConfig(someRelease);
assertEquals(someAppId, result.getAppId());
assertEquals(someClusterName, result.getCluster());
assertEquals(someNamespaceName, result.getNamespace());
assertEquals(String.valueOf(someServerSideNewReleaseId), result.getReleaseId());
}
@Test
public void testQueryConfigWithVersionNotFound() throws Exception {
String someAppId = "1";
String someClusterName = "someClusterName";
String someNamespaceName = "someNamespaceName";
public void testQueryConfigWithReleaseNotFound() throws Exception {
long someClientSideReleaseId = 1;
HttpServletResponse someResponse = mock(HttpServletResponse.class);
when(configService.findRelease(someAppId, someClusterName, someNamespaceName)).thenReturn(null);
ApolloConfig result = configController.queryConfig(someAppId, someClusterName,
someNamespaceName, String.valueOf(someClientSideReleaseId), someResponse);
someNamespaceName, someDataCenter, String.valueOf(someClientSideReleaseId), someResponse);
assertNull(result);
verify(someResponse, times(1)).sendError(eq(HttpServletResponse.SC_NOT_FOUND), anyString());
}
@Test
public void testQueryConfigWithApolloConfigNotFound() throws Exception {
String someAppId = "1";
String someClusterName = "someClusterName";
String someNamespaceName = "someNamespaceName";
public void testQueryConfigWithApolloConfigNotModified() throws Exception {
long someClientSideReleaseId = 1;
long someServerSideNewReleaseId = 2;
long someServerSideReleaseId = someClientSideReleaseId;
HttpServletResponse someResponse = mock(HttpServletResponse.class);
Release someRelease = mock(Release.class);
when(configService.findRelease(someAppId, someClusterName, someNamespaceName))
.thenReturn(someRelease);
when(someRelease.getId()).thenReturn(someServerSideNewReleaseId);
when(configService.loadConfig(someRelease)).thenReturn(null);
when(someRelease.getId()).thenReturn(someServerSideReleaseId);
ApolloConfig result = configController.queryConfig(someAppId, someClusterName,
someNamespaceName, String.valueOf(someClientSideReleaseId), someResponse);
ApolloConfig
result =
configController.queryConfig(someAppId, someClusterName, someNamespaceName,
someDataCenter, String.valueOf(someClientSideReleaseId), someResponse);
assertNull(result);
verify(someResponse, times(1)).sendError(eq(HttpServletResponse.SC_NOT_FOUND), anyString());
verify(someResponse, times(1)).setStatus(HttpServletResponse.SC_NOT_MODIFIED);
}
@Test
public void testQueryConfigWithApolloConfigNotModified() throws Exception {
String someAppId = "1";
String someClusterName = "someClusterName";
String someNamespaceName = "someNamespaceName";
long someClientSideReleaseId = 1;
long someServerSideReleaseId = someClientSideReleaseId;
HttpServletResponse someResponse = mock(HttpServletResponse.class);
Release someRelease = mock(Release.class);
public void testMergeConfigurations() throws Exception {
Gson gson = new Gson();
String key1 = "key1";
String value1 = "value1";
String anotherValue1 = "anotherValue1";
when(configService.findRelease(someAppId, someClusterName, someNamespaceName))
.thenReturn(someRelease);
when(someRelease.getId()).thenReturn(someServerSideReleaseId);
String key2 = "key2";
String value2 = "value2";
ApolloConfig result = configController.queryConfig(someAppId, someClusterName, someNamespaceName,
String.valueOf(someClientSideReleaseId), someResponse);
Map<String, String> config = ImmutableMap.of(key1, anotherValue1);
Map<String, String> anotherConfig = ImmutableMap.of(key1, value1, key2, value2);
assertNull(result);
verify(someResponse, times(1)).setStatus(HttpServletResponse.SC_NOT_MODIFIED);
verify(configService, never()).loadConfig(any(Release.class));
Release releaseWithHighPriority = new Release();
releaseWithHighPriority.setConfigurations(gson.toJson(config));
Release releaseWithLowPriority = new Release();
releaseWithLowPriority.setConfigurations(gson.toJson(anotherConfig));
Map<String, String> result =
configController.mergeReleaseConfigurations(
Lists.newArrayList(releaseWithHighPriority, releaseWithLowPriority));
assertEquals(2, result.keySet().size());
assertEquals(anotherValue1, result.get(key1));
assertEquals(value2, result.get(key2));
}
@Test(expected = JsonSyntaxException.class)
public void testTransformConfigurationToMapFailed() throws Exception {
String someInvalidConfiguration = "xxx";
Release someRelease = new Release();
someRelease.setConfigurations(someInvalidConfiguration);
configController.mergeReleaseConfigurations(Lists.newArrayList(someRelease));
}
}
package com.ctrip.apollo.configservice.controller;
import com.google.common.collect.Multimap;
import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class NotificationControllerTest {
private NotificationController controller;
private String someAppId;
private String someCluster;
private String someNamespace;
private String someDataCenter;
private String someReleaseId;
@Mock
private HttpServletResponse response;
private Multimap<String, DeferredResult<ApolloConfigNotification>> deferredResults;
@Before
public void setUp() throws Exception {
controller = new NotificationController();
someAppId = "someAppId";
someCluster = "someCluster";
someNamespace = "someNamespace";
someDataCenter = "someDC";
someReleaseId = "someRelease";
deferredResults =
(Multimap<String, DeferredResult<ApolloConfigNotification>>) ReflectionTestUtils
.getField(controller, "deferredResults");
}
@Test
public void testPollNotificationWithDefaultNamespace() throws Exception {
someNamespace = someAppId; //default namespace
DeferredResult<ApolloConfigNotification>
deferredResult = controller
.pollNotification(someAppId, someCluster, someNamespace, someDataCenter, someReleaseId,
response);
String key = String.format("%s-%s-%s", someAppId, someCluster, someNamespace);
assertEquals(1, deferredResults.size());
assertTrue(deferredResults.get(key).contains(deferredResult));
}
@Test
public void testPollNotificationWithDefaultNamespaceAndHandleMessage() throws Exception {
someNamespace = someAppId; //default namespace
DeferredResult<ApolloConfigNotification>
deferredResult = controller
.pollNotification(someAppId, someCluster, someNamespace, someDataCenter, someReleaseId,
response);
String key = String.format("%s-%s-%s", someAppId, someCluster, someNamespace);
controller.handleMessage(key, Topics.APOLLO_RELEASE_TOPIC);
ApolloConfigNotification notification = (ApolloConfigNotification) deferredResult.getResult();
assertEquals(someAppId, notification.getAppId());
assertEquals(someCluster, notification.getCluster());
assertEquals(someNamespace, notification.getNamespace());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册