提交 f6d4da92 编写于 作者: 张乐 提交者: GitHub

Merge pull request #688 from nobodyiam/misc-change

Misc change
...@@ -106,11 +106,11 @@ public class ConfigServiceLocator { ...@@ -106,11 +106,11 @@ public class ConfigServiceLocator {
transaction.setStatus(Transaction.SUCCESS); transaction.setStatus(Transaction.SUCCESS);
List<ServiceDTO> services = response.getBody(); List<ServiceDTO> services = response.getBody();
if (services == null || services.isEmpty()) { if (services == null || services.isEmpty()) {
logConfigServiceToCat("Empty response!"); logConfigService("Empty response!");
continue; continue;
} }
m_configServices.set(services); m_configServices.set(services);
logConfigServicesToCat(services); logConfigServices(services);
return; return;
} catch (Throwable ex) { } catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex)); Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
...@@ -145,13 +145,13 @@ public class ConfigServiceLocator { ...@@ -145,13 +145,13 @@ public class ConfigServiceLocator {
return domainName + "/services/config?" + MAP_JOINER.join(queryParams); return domainName + "/services/config?" + MAP_JOINER.join(queryParams);
} }
private void logConfigServicesToCat(List<ServiceDTO> serviceDtos) { private void logConfigServices(List<ServiceDTO> serviceDtos) {
for (ServiceDTO serviceDto : serviceDtos) { for (ServiceDTO serviceDto : serviceDtos) {
logConfigServiceToCat(serviceDto.getHomepageUrl()); logConfigService(serviceDto.getHomepageUrl());
} }
} }
private void logConfigServiceToCat(String serviceUrl) { private void logConfigService(String serviceUrl) {
Tracer.logEvent("Apollo.Config.Services", serviceUrl); Tracer.logEvent("Apollo.Config.Services", serviceUrl);
} }
} }
...@@ -7,6 +7,7 @@ import java.util.Properties; ...@@ -7,6 +7,7 @@ import java.util.Properties;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -17,6 +18,8 @@ import com.ctrip.framework.apollo.build.ApolloInjector; ...@@ -17,6 +18,8 @@ import com.ctrip.framework.apollo.build.ApolloInjector;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfig; import com.ctrip.framework.apollo.core.dto.ApolloConfig;
import com.ctrip.framework.apollo.core.dto.ServiceDTO; import com.ctrip.framework.apollo.core.dto.ServiceDTO;
import com.ctrip.framework.apollo.core.schedule.ExponentialSchedulePolicy;
import com.ctrip.framework.apollo.core.schedule.SchedulePolicy;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory; import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.exceptions.ApolloConfigException; import com.ctrip.framework.apollo.exceptions.ApolloConfigException;
import com.ctrip.framework.apollo.exceptions.ApolloConfigStatusCodeException; import com.ctrip.framework.apollo.exceptions.ApolloConfigStatusCodeException;
...@@ -51,6 +54,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -51,6 +54,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final static ScheduledExecutorService m_executorService; private final static ScheduledExecutorService m_executorService;
private AtomicReference<ServiceDTO> m_longPollServiceDto; private AtomicReference<ServiceDTO> m_longPollServiceDto;
private RateLimiter m_loadConfigRateLimiter; private RateLimiter m_loadConfigRateLimiter;
private AtomicBoolean m_configNeedForceRefresh;
private SchedulePolicy m_loadConfigFailSchedulePolicy;
private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper(); private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper(); private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();
...@@ -73,6 +78,9 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -73,6 +78,9 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class); remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>(); m_longPollServiceDto = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS()); m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
this.trySync(); this.trySync();
this.schedulePeriodicRefresh(); this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh(); this.scheduleLongPollingRefresh();
...@@ -154,7 +162,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -154,7 +162,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
String cluster = m_configUtil.getCluster(); String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter(); String dataCenter = m_configUtil.getDataCenter();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace)); Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = 2; int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null; Throwable exception = null;
List<ServiceDTO> configServices = getConfigServices(); List<ServiceDTO> configServices = getConfigServices();
...@@ -167,6 +176,18 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -167,6 +176,18 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
} }
for (ServiceDTO configService : randomConfigServices) { for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}
String url = String url =
assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace, assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_configCache.get()); dataCenter, m_configCache.get());
...@@ -179,6 +200,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -179,6 +200,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
try { try {
HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class); HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
transaction.addData("StatusCode", response.getStatusCode()); transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS); transaction.setStatus(Transaction.SUCCESS);
...@@ -215,13 +238,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -215,13 +238,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
transaction.complete(); transaction.complete();
} }
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
} }
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(m_configUtil.getOnErrorRetryInterval());
} catch (InterruptedException ex) {
//ignore
}
} }
String message = String.format( String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s", "Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s",
...@@ -271,6 +292,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -271,6 +292,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
m_executorService.submit(new Runnable() { m_executorService.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
m_configNeedForceRefresh.set(true);
trySync(); trySync();
} }
}); });
......
package com.ctrip.framework.apollo.spring.config; package com.ctrip.framework.apollo.spring.config;
import java.util.Collection; import com.google.common.collect.HashMultimap;
import java.util.Iterator; import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Multimap;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
...@@ -13,14 +17,16 @@ import org.springframework.core.env.CompositePropertySource; ...@@ -13,14 +17,16 @@ import org.springframework.core.env.CompositePropertySource;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import com.ctrip.framework.apollo.Config; import java.util.Collection;
import com.ctrip.framework.apollo.ConfigService; import java.util.Iterator;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Multimap;
/** /**
* Apollo Property Sources processor for Spring Annotation Based Application * Apollo Property Sources processor for Spring Annotation Based Application. <br /> <br />
*
* The reason why PropertySourcesProcessor implements {@link BeanFactoryPostProcessor} instead of
* {@link org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor} is that lower versions of
* Spring (e.g. 3.1.1) doesn't support registering BeanDefinitionRegistryPostProcessor in ImportBeanDefinitionRegistrar
* - {@link com.ctrip.framework.apollo.spring.annotation.ApolloConfigRegistrar}
* *
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
*/ */
...@@ -68,7 +74,7 @@ public class PropertySourcesProcessor implements BeanFactoryPostProcessor, Envir ...@@ -68,7 +74,7 @@ public class PropertySourcesProcessor implements BeanFactoryPostProcessor, Envir
} }
//only for test //only for test
private static void reset() { private static void reset() {
NAMESPACE_NAMES.clear(); NAMESPACE_NAMES.clear();
} }
......
...@@ -107,17 +107,17 @@ public class NotificationController implements ReleaseMessageListener { ...@@ -107,17 +107,17 @@ public class NotificationController implements ReleaseMessageListener {
} }
deferredResult deferredResult
.onTimeout(() -> logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResult.onCompletion(() -> { deferredResult.onCompletion(() -> {
//unregister all keys //unregister all keys
for (String key : watchedKeys) { for (String key : watchedKeys) {
deferredResults.remove(key, deferredResult); deferredResults.remove(key, deferredResult);
} }
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.CompletedKeys"); logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
}); });
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.RegisteredKeys"); logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}", logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespace, dataCenter); watchedKeys, appId, cluster, namespace, dataCenter);
} }
...@@ -159,7 +159,7 @@ public class NotificationController implements ReleaseMessageListener { ...@@ -159,7 +159,7 @@ public class NotificationController implements ReleaseMessageListener {
logger.debug("Notification completed"); logger.debug("Notification completed");
} }
private void logWatchedKeysToCat(Set<String> watchedKeys, String eventName) { private void logWatchedKeys(Set<String> watchedKeys, String eventName) {
for (String watchedKey : watchedKeys) { for (String watchedKey : watchedKeys) {
Tracer.logEvent(eventName, watchedKey); Tracer.logEvent(eventName, watchedKey);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册