From a9bc38d7bb1b36d19459d4f0f59ecfe87b2e9fbf Mon Sep 17 00:00:00 2001 From: kimi Date: Mon, 9 Apr 2012 05:31:22 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=96=B9=E6=B3=95=E7=9A=84?= =?UTF-8?q?=E8=AE=BF=E9=97=AE=E4=BF=AE=E9=A5=B0=E7=AC=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@1515 1a56cb94-b969-4eaa-88fa-be21384802f2 --- .../remoting/transport/AbstractClient.java | 262 +++++++++--------- 1 file changed, 131 insertions(+), 131 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java index d0006020d..b500850a0 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/AbstractClient.java @@ -1,45 +1,45 @@ -/* - * Copyright 1999-2011 Alibaba Group. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alibaba.dubbo.remoting.transport; -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import com.alibaba.dubbo.common.Constants; -import com.alibaba.dubbo.common.URL; -import com.alibaba.dubbo.common.Version; -import com.alibaba.dubbo.common.logger.Logger; -import com.alibaba.dubbo.common.logger.LoggerFactory; -import com.alibaba.dubbo.common.utils.ExecutorUtil; -import com.alibaba.dubbo.common.utils.NamedThreadFactory; -import com.alibaba.dubbo.common.utils.NetUtils; -import com.alibaba.dubbo.remoting.Channel; -import com.alibaba.dubbo.remoting.ChannelHandler; -import com.alibaba.dubbo.remoting.Client; -import com.alibaba.dubbo.remoting.RemotingException; -import com.alibaba.dubbo.remoting.transport.dispather.ChannelHandlers; -import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.Version; +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.utils.ExecutorUtil; +import com.alibaba.dubbo.common.utils.NamedThreadFactory; +import com.alibaba.dubbo.common.utils.NetUtils; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.ChannelHandler; +import com.alibaba.dubbo.remoting.Client; +import com.alibaba.dubbo.remoting.RemotingException; +import com.alibaba.dubbo.remoting.transport.dispather.ChannelHandlers; +import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler; /** * AbstractClient @@ -47,8 +47,8 @@ import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler; * @author qian.lei * @author chao.liuc */ -public abstract class AbstractClient extends AbstractEndpoint implements Client { - +public abstract class AbstractClient extends AbstractEndpoint implements Client { + private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); protected static final String CLIENT_THREAD_POOL_NAME ="DubboClientHandler"; @@ -61,39 +61,39 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client private volatile ScheduledFuture reconnectExecutorFuture = null; - protected volatile ExecutorService executor; - - private final boolean send_reconnect ; - - private final AtomicInteger reconnect_count = new AtomicInteger(0); - - //重连的error日志是否已经被调用过. - private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false) ; - - //重连warning的间隔.(waring多少次之后,warning一次) //for test - private final int reconnect_warning_period ; - - //the last successed connected time - private long lastConnectedTime = System.currentTimeMillis(); - + protected volatile ExecutorService executor; + + private final boolean send_reconnect ; + + private final AtomicInteger reconnect_count = new AtomicInteger(0); + + //重连的error日志是否已经被调用过. + private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false) ; + + //重连warning的间隔.(waring多少次之后,warning一次) //for test + private final int reconnect_warning_period ; + + //the last successed connected time + private long lastConnectedTime = System.currentTimeMillis(); + private final long shutdown_timeout ; public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { - super(url, handler); - - send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); - - shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); - - //默认重连间隔2s,1800表示1小时warning一次. - reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); + super(url, handler); + + send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); + + shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); + + //默认重连间隔2s,1800表示1小时warning一次. + reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); try { doOpen(); } catch (Throwable t) { close(); - throw new RemotingException(url.toInetSocketAddress(), null, + throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } @@ -104,7 +104,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); } } catch (RemotingException t) { - if (url.getParameter(Constants.CHECK_KEY, true)) { + if (url.getParameter(Constants.CHECK_KEY, true)) { close(); throw t; } else { @@ -113,7 +113,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client } } catch (Throwable t){ close(); - throw new RemotingException(url.toInetSocketAddress(), null, + throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } @@ -121,12 +121,12 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } - } - - protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){ - url = url.addParameterIfAbsent(Constants.THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME) - .addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); - return ChannelHandlers.wrap(handler, url); + } + + protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){ + url = url.addParameterIfAbsent(Constants.THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME) + .addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); + return ChannelHandlers.wrap(handler, url); } /** @@ -136,56 +136,56 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client //reconnect=false to close reconnect int reconnect = getReconnectParam(getUrl()); if(reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())){ - Runnable connectStatusCheckCommand = new Runnable() { + Runnable connectStatusCheckCommand = new Runnable() { public void run() { try { if (! isConnected()) { connect(); - } else { - lastConnectedTime = System.currentTimeMillis(); + } else { + lastConnectedTime = System.currentTimeMillis(); + } + } catch (Throwable t) { + String errorMsg = "client reconnect to "+getUrl().getAddress()+" find error . url: "+ getUrl(); + // wait registry sync provider list + if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout){ + if (!reconnect_error_log_flag.get()){ + reconnect_error_log_flag.set(true); + logger.error(errorMsg, t); + return ; + } + } + if ( reconnect_count.getAndIncrement() % reconnect_warning_period == 0){ + logger.warn(errorMsg, t); } - } catch (Throwable t) { - String errorMsg = "client reconnect to "+getUrl().getAddress()+" find error . url: "+ getUrl(); - // wait registry sync provider list - if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout){ - if (!reconnect_error_log_flag.get()){ - reconnect_error_log_flag.set(true); - logger.error(errorMsg, t); - return ; - } - } - if ( reconnect_count.getAndIncrement() % reconnect_warning_period == 0){ - logger.warn(errorMsg, t); - } } } }; reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); } - } - - /** - * @param url - * @return 0-false - */ - private static int getReconnectParam(URL url){ - int reconnect ; - String param = url.getParameter(Constants.RECONNECT_KEY); - if (param == null || param.length()==0 || "true".equalsIgnoreCase(param)){ - reconnect = Constants.DEFAULT_RECONNECT_PERIOD; - }else if ("false".equalsIgnoreCase(param)){ - reconnect = 0; - } else { - try{ - reconnect = Integer.parseInt(param); - }catch (Exception e) { - throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param); - } - if(reconnect < 0){ - throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param); - } - } - return reconnect; + } + + /** + * @param url + * @return 0-false + */ + private static int getReconnectParam(URL url){ + int reconnect ; + String param = url.getParameter(Constants.RECONNECT_KEY); + if (param == null || param.length()==0 || "true".equalsIgnoreCase(param)){ + reconnect = Constants.DEFAULT_RECONNECT_PERIOD; + }else if ("false".equalsIgnoreCase(param)){ + reconnect = 0; + } else { + try{ + reconnect = Integer.parseInt(param); + }catch (Exception e) { + throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param); + } + if(reconnect < 0){ + throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param); + } + } + return reconnect; } private synchronized void destroyConnectStatusCheckCommand(){ @@ -210,7 +210,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client public InetSocketAddress getRemoteAddress() { Channel channel = getChannel(); if (channel == null) - return getUrl().toInetSocketAddress(); + return getUrl().toInetSocketAddress(); return channel.getRemoteAddress(); } @@ -256,11 +256,11 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client return channel.hasAttribute(key); } - public void send(Object message, boolean sent) throws RemotingException { - if (send_reconnect && !isConnected()){ - connect(); - } - Channel channel = getChannel(); + public void send(Object message, boolean sent) throws RemotingException { + if (send_reconnect && !isConnected()){ + connect(); + } + Channel channel = getChannel(); //TODO getChannel返回的状态是否包含null需要改进 if (channel == null || ! channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); @@ -268,7 +268,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client channel.send(message, sent); } - private void connect() throws RemotingException { + protected void connect() throws RemotingException { connectLock.lock(); try { if (isConnected()) { @@ -280,14 +280,14 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getTimeout() + "ms."); - } else { - if (logger.isInfoEnabled()){ - logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " - + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() - + ", channel is " + this.getChannel()); - } - } - reconnect_count.set(0); + } else { + if (logger.isInfoEnabled()){ + logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + + ", channel is " + this.getChannel()); + } + } + reconnect_count.set(0); reconnect_error_log_flag.set(false); } catch (RemotingException e) { throw e; @@ -295,9 +295,9 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); - } finally { + } finally { connectLock.unlock(); - } + } } public void disconnect() { -- GitLab