提交 6cd742f7 编写于 作者: Y YunaiV

优雅挂逼

日志适配
上级 29d3b8e2
......@@ -17,7 +17,7 @@
package com.alibaba.dubbo.common.logger;
/**
* Level
* Level 日志级别
*/
public enum Level {
......
......@@ -18,6 +18,9 @@ package com.alibaba.dubbo.common.logger;
/**
* Logger interface
*
* Logger 接口
*
* <p>
* This interface is referred from commons-logging
*/
......
......@@ -22,6 +22,8 @@ import java.io.File;
/**
* Logger provider
*
* Logger 适配器接口
*/
@SPI
public interface LoggerAdapter {
......
......@@ -33,12 +33,22 @@ import java.util.concurrent.ConcurrentMap;
*/
public class LoggerFactory {
/**
* 已创建的 Logger 对应的映射
*
* key:类名
*/
private static final ConcurrentMap<String, FailsafeLogger> LOGGERS = new ConcurrentHashMap<String, FailsafeLogger>();
/**
* 当前使用的 LoggerAdapter 日志适配器
*/
private static volatile LoggerAdapter LOGGER_ADAPTER;
// search common-used logging frameworks
static {
// 获得 "logger" 配置项
String logger = System.getProperty("dubbo.application.logger");
// 根据配置项,进行对应的 LoggerAdapter 对象
if ("slf4j".equals(logger)) {
setLoggerAdapter(new Slf4jLoggerAdapter());
} else if ("jcl".equals(logger)) {
......@@ -48,6 +58,7 @@ public class LoggerFactory {
} else if ("jdk".equals(logger)) {
setLoggerAdapter(new JdkLoggerAdapter());
} else {
// 未配置,按照 log4j > slf4j > apache common logger > jdk logger
try {
setLoggerAdapter(new Log4jLoggerAdapter());
} catch (Throwable e1) {
......@@ -80,9 +91,12 @@ public class LoggerFactory {
*/
public static void setLoggerAdapter(LoggerAdapter loggerAdapter) {
if (loggerAdapter != null) {
// 获得 Logger 对象,并打印日志,提示设置后的 LoggerAdapter 实现类
Logger logger = loggerAdapter.getLogger(LoggerFactory.class.getName());
logger.info("using logger: " + loggerAdapter.getClass().getName());
// 设置 LOGGER_ADAPTER 属性
LoggerFactory.LOGGER_ADAPTER = loggerAdapter;
// 循环,将原有已经生成的 LOGGER 缓存对象,全部重新生成替换
for (Map.Entry<String, FailsafeLogger> entry : LOGGERS.entrySet()) {
entry.getValue().setLogger(LOGGER_ADAPTER.getLogger(entry.getKey()));
}
......@@ -96,7 +110,9 @@ public class LoggerFactory {
* @return logger
*/
public static Logger getLogger(Class<?> key) {
// 从缓存中,获得 Logger 对象
FailsafeLogger logger = LOGGERS.get(key.getName());
// 不存在,则进行创建,并进行缓存
if (logger == null) {
LOGGERS.putIfAbsent(key.getName(), new FailsafeLogger(LOGGER_ADAPTER.getLogger(key)));
logger = LOGGERS.get(key.getName());
......@@ -111,7 +127,9 @@ public class LoggerFactory {
* @return logger provider
*/
public static Logger getLogger(String key) {
// 从缓存中,获得 Logger 对象
FailsafeLogger logger = LOGGERS.get(key);
// 不存在,则进行创建,并进行缓存
if (logger == null) {
LOGGERS.putIfAbsent(key, new FailsafeLogger(LOGGER_ADAPTER.getLogger(key)));
logger = LOGGERS.get(key);
......
......@@ -52,4 +52,4 @@ public class JclLoggerAdapter implements LoggerAdapter {
this.file = file;
}
}
}
\ No newline at end of file
......@@ -79,6 +79,7 @@ public class Log4jLogger implements Logger {
logger.log(FQCN, Level.WARN, msg, e);
}
@Override
public void error(String msg) {
logger.log(FQCN, Level.ERROR, msg, null);
}
......
......@@ -27,18 +27,27 @@ import org.apache.log4j.LogManager;
import java.io.File;
import java.util.Enumeration;
/**
* Log4j 的 LoggerAdapter 实现类
*/
public class Log4jLoggerAdapter implements LoggerAdapter {
/**
* Root Logger 的文件,在构造方法中初始化
*/
private File file;
@SuppressWarnings("unchecked")
public Log4jLoggerAdapter() {
try {
// 获得 Root Logger 对象
org.apache.log4j.Logger logger = LogManager.getRootLogger();
if (logger != null) {
// 循环每个 Logger 对象的 Appender 对象
Enumeration<Appender> appenders = logger.getAllAppenders();
if (appenders != null) {
while (appenders.hasMoreElements()) {
// 当且仅当 FileAppender 时
Appender appender = appenders.nextElement();
if (appender instanceof FileAppender) {
FileAppender fileAppender = (FileAppender) appender;
......@@ -87,28 +96,33 @@ public class Log4jLoggerAdapter implements LoggerAdapter {
return Level.OFF;
}
@Override
public Logger getLogger(Class<?> key) {
return new Log4jLogger(LogManager.getLogger(key));
}
@Override
public Logger getLogger(String key) {
return new Log4jLogger(LogManager.getLogger(key));
}
@Override
public Level getLevel() {
return fromLog4jLevel(LogManager.getRootLogger().getLevel());
}
@Override
public void setLevel(Level level) {
LogManager.getRootLogger().setLevel(toLog4jLevel(level));
}
@Override
public File getFile() {
return file;
}
@Override
public void setFile(File file) {
}
}
\ No newline at end of file
......@@ -27,28 +27,34 @@ public class Slf4jLoggerAdapter implements LoggerAdapter {
private Level level;
private File file;
@Override
public Logger getLogger(String key) {
return new Slf4jLogger(org.slf4j.LoggerFactory.getLogger(key));
}
@Override
public Logger getLogger(Class<?> key) {
return new Slf4jLogger(org.slf4j.LoggerFactory.getLogger(key));
}
public Level getLevel() {
@Override
public Level getLevel() { // 无用
return level;
}
public void setLevel(Level level) {
@Override
public void setLevel(Level level) { // 无用
this.level = level;
}
public File getFile() {
@Override
public File getFile() { // 无用
return file;
}
public void setFile(File file) {
@Override
public void setFile(File file) { // 无用
this.file = file;
}
}
}
\ No newline at end of file
......@@ -22,6 +22,9 @@ import com.alibaba.dubbo.common.utils.NetUtils;
public class FailsafeLogger implements Logger {
/**
* Dubbo Logger 对象
*/
private Logger logger;
public FailsafeLogger(Logger logger) {
......@@ -117,6 +120,7 @@ public class FailsafeLogger implements Logger {
}
}
@Override
public void error(String msg) {
try {
logger.error(appendContextMessage(msg));
......
......@@ -313,13 +313,17 @@ public class ConfigUtils {
@SuppressWarnings("deprecation")
public static int getServerShutdownTimeout() {
// 默认,10 * 1000 毫秒
int timeout = Constants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
// 获得 "dubbo.service.shutdown.wait" 配置项,单位:毫秒
String value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);
if (value != null && value.length() > 0) {
try {
timeout = Integer.parseInt(value);
} catch (Exception e) {
}
// 若为空,获得 "dubbo.service.shutdown.wait.seconds" 配置项,单位:秒。
// ps:目前已经废弃该参数,推荐使用 "dubbo.service.shutdown.wait"
} else {
value = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);
if (value != null && value.length() > 0) {
......@@ -329,7 +333,7 @@ public class ConfigUtils {
}
}
}
// 返回
return timeout;
}
......
......@@ -28,7 +28,9 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorUtil {
private static final Logger logger = LoggerFactory.getLogger(ExecutorUtil.class);
private static final ThreadPoolExecutor shutdownExecutor = new ThreadPoolExecutor(0, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(100),
......@@ -36,17 +38,17 @@ public class ExecutorUtil {
public static boolean isShutdown(Executor executor) {
if (executor instanceof ExecutorService) {
if (((ExecutorService) executor).isShutdown()) {
return true;
}
return ((ExecutorService) executor).isShutdown();
}
return false;
}
public static void gracefulShutdown(Executor executor, int timeout) {
// 忽略,若不是 ExecutorService ,或者已经关闭
if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
return;
}
// 关闭,禁止新的任务提交,将原有任务执行完
final ExecutorService es = (ExecutorService) executor;
try {
es.shutdown(); // Disable new tasks from being submitted
......@@ -55,23 +57,28 @@ public class ExecutorUtil {
} catch (NullPointerException ex2) {
return;
}
// 等待原有任务执行完。若等待超时,强制结束所有任务
try {
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
// 发生 InterruptedException 异常,也强制结束所有任务
es.shutdownNow();
Thread.currentThread().interrupt();
}
// 若未关闭成功,新开线程去关闭
if (!isShutdown(es)) {
newThreadToCloseExecutor(es);
}
}
public static void shutdownNow(Executor executor, final int timeout) {
// 忽略,若不是 ExecutorService ,或者已经关闭
if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
return;
}
// 立即关闭,包括原有任务也打断
final ExecutorService es = (ExecutorService) executor;
try {
es.shutdownNow();
......@@ -80,11 +87,13 @@ public class ExecutorUtil {
} catch (NullPointerException ex2) {
return;
}
// 等待原有任务被打断完成
try {
es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// 若未关闭成功,新开线程去关闭
if (!isShutdown(es)) {
newThreadToCloseExecutor(es);
}
......@@ -95,8 +104,11 @@ public class ExecutorUtil {
shutdownExecutor.execute(new Runnable() {
public void run() {
try {
// 循环 1000 次,不断强制结束线程池
for (int i = 0; i < 1000; i++) {
// 立即关闭,包括原有任务也打断
es.shutdownNow();
// 等待原有任务被打断完成
if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
......
......@@ -93,7 +93,7 @@ public abstract class AbstractConfig implements Serializable {
legacyProperties.put("dubbo.service.url", "dubbo.service.address");
}
static { // TODO 芋艿,注意
static {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (logger.isInfoEnabled()) {
......
......@@ -238,8 +238,8 @@ public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
// 添加到结果
if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) // 服务提供者 && 注册
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { // 服务消费者 && 订阅
if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) // 服务提供者 && 注册 https://dubbo.gitbooks.io/dubbo-user-book/demos/subscribe-only.html
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { // 服务消费者 && 订阅 https://dubbo.gitbooks.io/dubbo-user-book/demos/registry-only.html
registryList.add(url);
}
}
......
......@@ -60,7 +60,7 @@ public class ApplicationConfig extends AbstractConfig {
private String compiler; // TODO 芋艿
// logger
private String logger; // TODO 芋艿
private String logger;
// registry centers
private List<RegistryConfig> registries;
......@@ -203,6 +203,7 @@ public class ApplicationConfig extends AbstractConfig {
public void setLogger(String logger) {
this.logger = logger;
// 设置 LoggerAdapter
LoggerFactory.setLoggerAdapter(logger);
}
......
......@@ -77,13 +77,13 @@ public class ProtocolConfig extends AbstractConfig {
private Integer queues;
// max acceptable connections
private Integer accepts; // TODO 芋艿,
private Integer accepts;
// protocol codec
private String codec; // TODO 芋艿,
private String codec;
// serialization
private String serialization; // TODO 芋艿,
private String serialization;
// charset
private String charset;
......@@ -95,31 +95,31 @@ public class ProtocolConfig extends AbstractConfig {
private Integer buffer;
// heartbeat interval
private Integer heartbeat; // TODO 芋艿,
private Integer heartbeat;
// access log
private String accesslog; // TODO 芋艿,
private String accesslog;
// transfort
private String transporter; // TODO 芋艿,
private String transporter;
// how information is exchanged
private String exchanger; // TODO 芋艿,
private String exchanger;
// thread dispatch mode
private String dispatcher; // TODO 芋艿,
private String dispatcher;
// networker
private String networker; // TODO ,芋艿
// sever impl
private String server; // TODO ,芋艿
private String server;
// client impl
private String client; // TODO ,芋艿
private String client;
// supported telnet commands, separated with comma.
private String telnet; // TODO ,芋艿
private String telnet;
// command line prompt
private String prompt; // TODO ,芋艿
......@@ -133,19 +133,21 @@ public class ProtocolConfig extends AbstractConfig {
// parameters
// 是否长连接
// TODO add this to provider config
private Boolean keepAlive; // TODO ,芋艿
private Boolean keepAlive;
// TODO add this to provider config
private String optimizer; // TODO ,芋艿
private String optimizer;
private String extension; // TODO ,芋艿
private String extension;
// parameters
private Map<String, String> parameters;
// if it's default
private Boolean isDefault;
/**
* 是否已经销毁
*/
private static final AtomicBoolean destroyed = new AtomicBoolean(false);
public ProtocolConfig() {
......@@ -162,11 +164,14 @@ public class ProtocolConfig extends AbstractConfig {
// TODO: 2017/8/30 to move this method somewhere else
public static void destroyAll() {
// 忽略,若已经销毁
if (!destroyed.compareAndSet(false, true)) {
return;
}
// 销毁 Registry 相关
AbstractRegistryFactory.destroyAll();
// 等到服务消费,接收到注册中心通知到该服务提供者已经下线,加大了在不重试情况下优雅停机的成功率。
// Wait for registry notification
try {
Thread.sleep(ConfigUtils.getServerShutdownTimeout());
......@@ -174,6 +179,7 @@ public class ProtocolConfig extends AbstractConfig {
logger.warn("Interrupted unexpectedly when waiting for registry notification during shutdown process!");
}
// 销毁 Protocol 相关
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
for (String protocolName : loader.getLoadedExtensions()) {
try {
......
......@@ -442,7 +442,7 @@ public class ServiceConfig<T> extends AbstractServiceConfig {
if (unexported) {
return;
}
if (exporters != null && !exporters.isEmpty()) {
if (!exporters.isEmpty()) {
for (Exporter<?> exporter : exporters) {
try {
exporter.unexport();
......
......@@ -16,6 +16,7 @@
*/
package com.alibaba.dubbo.demo.consumer;
import com.alibaba.dubbo.config.ProtocolConfig;
import com.alibaba.dubbo.demo.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;
......@@ -41,10 +42,14 @@ public class Consumer {
// demoService.say01(null);
demoService.say01("TestException");
// demoService.say01("TestException");
// demoService.hello("01");
// ((EchoService) demoService).$echo("test4u");
// ((EchoService) demoService).$echo("test4u");
demoService.sayHello("world");
ProtocolConfig.destroyAll();
Thread.sleep(10000000);
// demoService.say02();
// demoService.say03();
......
......@@ -36,7 +36,7 @@ limitations under the License.
<!-- generate proxy for the remote service, then demoService can be used in the same way as the
local regular interface -->
<dubbo:reference id="demoService" group="g1" check="false" interface="com.alibaba.dubbo.demo.DemoService"
<dubbo:reference id="demoService" group="g1" check="true" interface="com.alibaba.dubbo.demo.DemoService"
client="netty4" timeout="10000" callbacks="1000" registry="zk01">
<!--<dubbo:method name="sayHello" />-->
......
......@@ -16,6 +16,7 @@
*/
package com.alibaba.dubbo.demo.provider;
import com.alibaba.dubbo.config.ProtocolConfig;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Provider {
......@@ -27,6 +28,8 @@ public class Provider {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
context.start();
ProtocolConfig.destroyAll();
System.in.read(); // press any key to exit
}
......
......@@ -22,7 +22,7 @@ limitations under the License.
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="demo-provider"/>
<dubbo:application name="demo-provider" logger="jcl"/>
<dubbo:provider delay="-1" retries="0" >
<!--<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" protocol="pb" />-->
......@@ -32,7 +32,7 @@ limitations under the License.
<!--<dubbo:registry address="multicast://224.5.6.7:1234"/>-->
<!--<dubbo:registry address="zookeeper://127.0.0.1:2181"/>-->
<!--<dubbo:registry address="zookeeper://127.0.0.1:2181||10.20.153.11:2181,10.20.153.12:2181"/>-->
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!--<dubbo:registry address="redis://127.0.0.1:6379" />-->
......@@ -44,7 +44,8 @@ limitations under the License.
<!--<bean id="genericService" class="com.alibaba.dubbo.demo.provider.MyGenericService" />-->
<!-- declare the service interface to be exported -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" group="g1" ref="demoService" filter="demo" deprecated="false" callbacks="1000" timeout="200000" accesslog="true">
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" group="g1" ref="demoService" filter="demo" deprecated="false" callbacks="1000" timeout="200000" accesslog="true"
>
<!--<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" protocol="dubbo" ref="demoService"/>-->
<!--<dubbo:service id="sa" interface="com.alibaba.dubbo.demo.DemoService" protocol="dubbo" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl">-->
<!--&lt;!&ndash;<dubbo:method name="sayHello" retries="100">&ndash;&gt;-->
......
dubbo.application.qos.port=22222
dubbo.application.logger=jdk
dubbo.registry.address=192.168.0.1
# serviceA
......
......@@ -581,7 +581,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
}
}
// 不检查连接是否成功,总是创建 Invoker !
// 不检查连接是否成功,总是创建 Invoker !因为,启动检查,只有启动阶段需要。此时在检查,已经没必要了。
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
// The combination of directoryUrl and override is at the end of notify, which can't be handled here
......
......@@ -288,7 +288,7 @@ public class RegistryProtocol implements Protocol {
private URL getSubscribedOverrideUrl(URL registedProviderUrl) {
return registedProviderUrl.setProtocol(Constants.PROVIDER_PROTOCOL)
.addParameters(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY, // configurators
Constants.CHECK_KEY, String.valueOf(false)); // 订阅失败,不校验
Constants.CHECK_KEY, String.valueOf(false)); // 订阅失败,不校验。因为,不需要检查。
}
/**
......@@ -374,7 +374,7 @@ public class RegistryProtocol implements Protocol {
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
Constants.CHECK_KEY, String.valueOf(false))); // 不检查的原因是,不需要检查。
}
// 向注册中心订阅服务提供者 + 路由规则 + 配置规则
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
......@@ -389,11 +389,15 @@ public class RegistryProtocol implements Protocol {
return invoker;
}
@Override
public void destroy() {
// 获得 Exporter 数组
List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
// 取消所有 Exporter 的暴露
for (Exporter<?> exporter : exporters) {
exporter.unexport();
}
// 清空
bounds.clear();
}
......
......@@ -577,6 +577,7 @@ public abstract class AbstractRegistry implements Registry {
/**
* 取消注册和订阅
*/
@Override
public void destroy() {
// 已销毁,跳过
if (!destroyed.compareAndSet(false, true)) {
......@@ -591,7 +592,7 @@ public abstract class AbstractRegistry implements Registry {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url); // 取消
unregister(url); // 取消注册
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
......@@ -608,7 +609,7 @@ public abstract class AbstractRegistry implements Registry {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener); // 取消
unsubscribe(url, listener); // 取消订阅
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
......
......@@ -63,7 +63,7 @@ public abstract class AbstractRegistryFactory implements RegistryFactory {
}
/**
* 销毁
* 销毁所有 Registry
*
* Close all created registries
*/
......
......@@ -530,10 +530,13 @@ public abstract class FailbackRegistry extends AbstractRegistry {
@Override
public void destroy() {
if (!canDestroy()){
// 忽略,若已经销毁
if (!canDestroy()) {
return;
}
// 调用父方法,取消注册和订阅
super.destroy();
// 销毁重试任务
try {
retryFuture.cancel(true);
} catch (Throwable t) {
......
......@@ -118,8 +118,10 @@ public class ZookeeperRegistry extends FailbackRegistry {
@Override
public void destroy() {
// 调用父方法,取消注册和订阅
super.destroy();
try {
// 关闭 Zookeeper 客户端连接
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
......
......@@ -56,10 +56,12 @@ public abstract class AbstractExporter<T> implements Exporter<T> {
@Override
public void unexport() {
// 标记已经取消暴露
if (unexported) {
return;
}
unexported = true;
// 销毁
getInvoker().destroy();
}
......
......@@ -114,6 +114,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
return url;
}
@Override
public boolean isAvailable() {
return available;
}
......@@ -122,6 +123,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
this.available = available;
}
@Override
public void destroy() {
if (!destroyed.compareAndSet(false, true)) {
return;
......
......@@ -77,6 +77,7 @@ public abstract class AbstractProtocol implements Protocol {
@Override
public void destroy() {
// 销毁协议对应的服务消费者的所有 Invoker
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
invokers.remove(invoker);
......@@ -90,6 +91,7 @@ public abstract class AbstractProtocol implements Protocol {
}
}
}
// 销毁协议对应的服务提供者的所有 Exporter
for (String key : new ArrayList<String>(exporterMap.keySet())) {
Exporter<?> exporter = exporterMap.remove(key);
if (exporter != null) {
......@@ -104,4 +106,5 @@ public abstract class AbstractProtocol implements Protocol {
}
}
}
}
\ No newline at end of file
......@@ -127,6 +127,7 @@ public class ProtocolFilterWrapper implements Protocol {
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
@Override
public void destroy() {
protocol.destroy();
}
......
......@@ -72,6 +72,7 @@ public class ProtocolListenerWrapper implements Protocol {
return new ListenerInvokerWrapper<T>(invoker, listeners);
}
@Override
public void destroy() {
protocol.destroy();
}
......
......@@ -72,6 +72,7 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
return true;
}
@Override
public void destroy() {
}
......
......@@ -139,22 +139,24 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
public void destroy() {
// in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every
// time when client.close() is called, counter counts down once, and when counter reaches zero, client will be
// closed.
// 忽略,若已经销毁
if (super.isDestroyed()) {
return;
} else {
// double check to avoid dup close
// 双重锁校验,避免已经关闭
destroyLock.lock();
try {
if (super.isDestroyed()) {
return;
}
// 标记关闭
super.destroy();
// 移除出 `invokers`
if (invokers != null) {
invokers.remove(this);
}
// 关闭 ExchangeClient 们
for (ExchangeClient client : clients) {
try {
client.close(ConfigUtils.getServerShutdownTimeout());
......@@ -162,8 +164,8 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
logger.warn(t.getMessage(), t);
}
}
} finally {
// 释放锁
destroyLock.unlock();
}
}
......
......@@ -517,6 +517,7 @@ public class DubboProtocol extends AbstractProtocol {
@SuppressWarnings("Duplicates")
@Override
public void destroy() {
// 销毁所有 ExchangeServer
for (String key : new ArrayList<String>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
......@@ -531,6 +532,7 @@ public class DubboProtocol extends AbstractProtocol {
}
}
// 销毁所有 ExchangeClient
for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
......@@ -538,13 +540,13 @@ public class DubboProtocol extends AbstractProtocol {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close(ConfigUtils.getServerShutdownTimeout());
client.close(ConfigUtils.getServerShutdownTimeout()); // 销毁
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
// 销毁所有幽灵 ExchangeClient
for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
ExchangeClient client = ghostClientMap.remove(key);
if (client != null) {
......@@ -552,13 +554,15 @@ public class DubboProtocol extends AbstractProtocol {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close(ConfigUtils.getServerShutdownTimeout());
client.close(ConfigUtils.getServerShutdownTimeout()); // 销毁
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
// 【TODO 8033】 参数回调
stubServiceMethodsMap.clear();
super.destroy();
}
}
\ No newline at end of file
......@@ -159,6 +159,7 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
@Override
public void close(int timeout) {
// 防止client被关闭多次. 在 connect per jvm 的情况下,client.close 方法会调用计数器-1,当计数器小于等于0的情况下,才真正关闭
if (refenceCount.decrementAndGet() <= 0) {
// 关闭 `client`
if (timeout == 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册