提交 a9bc38d7 编写于 作者: K kimi

修改方法的访问修饰符

git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@1515 1a56cb94-b969-4eaa-88fa-be21384802f2
上级 5c537f50
/* /*
* Copyright 1999-2011 Alibaba Group. * Copyright 1999-2011 Alibaba Group.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.alibaba.dubbo.remoting.transport; package com.alibaba.dubbo.remoting.transport;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version; import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ExecutorUtil; import com.alibaba.dubbo.common.utils.ExecutorUtil;
import com.alibaba.dubbo.common.utils.NamedThreadFactory; import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.remoting.Channel; import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler; import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Client; import com.alibaba.dubbo.remoting.Client;
import com.alibaba.dubbo.remoting.RemotingException; import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.transport.dispather.ChannelHandlers; import com.alibaba.dubbo.remoting.transport.dispather.ChannelHandlers;
import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler; import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler;
/** /**
* AbstractClient * AbstractClient
...@@ -47,8 +47,8 @@ import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler; ...@@ -47,8 +47,8 @@ import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler;
* @author qian.lei * @author qian.lei
* @author chao.liuc * @author chao.liuc
*/ */
public abstract class AbstractClient extends AbstractEndpoint implements Client { public abstract class AbstractClient extends AbstractEndpoint implements Client {
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
protected static final String CLIENT_THREAD_POOL_NAME ="DubboClientHandler"; protected static final String CLIENT_THREAD_POOL_NAME ="DubboClientHandler";
...@@ -61,39 +61,39 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -61,39 +61,39 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
private volatile ScheduledFuture<?> reconnectExecutorFuture = null; private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
protected volatile ExecutorService executor; protected volatile ExecutorService executor;
private final boolean send_reconnect ; private final boolean send_reconnect ;
private final AtomicInteger reconnect_count = new AtomicInteger(0); private final AtomicInteger reconnect_count = new AtomicInteger(0);
//重连的error日志是否已经被调用过. //重连的error日志是否已经被调用过.
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false) ; private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false) ;
//重连warning的间隔.(waring多少次之后,warning一次) //for test //重连warning的间隔.(waring多少次之后,warning一次) //for test
private final int reconnect_warning_period ; private final int reconnect_warning_period ;
//the last successed connected time //the last successed connected time
private long lastConnectedTime = System.currentTimeMillis(); private long lastConnectedTime = System.currentTimeMillis();
private final long shutdown_timeout ; private final long shutdown_timeout ;
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler); super(url, handler);
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
//默认重连间隔2s,1800表示1小时warning一次. //默认重连间隔2s,1800表示1小时warning一次.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try { try {
doOpen(); doOpen();
} catch (Throwable t) { } catch (Throwable t) {
close(); close();
throw new RemotingException(url.toInetSocketAddress(), null, throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
} }
...@@ -104,7 +104,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -104,7 +104,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
} }
} catch (RemotingException t) { } catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) { if (url.getParameter(Constants.CHECK_KEY, true)) {
close(); close();
throw t; throw t;
} else { } else {
...@@ -113,7 +113,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -113,7 +113,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
} }
} catch (Throwable t){ } catch (Throwable t){
close(); close();
throw new RemotingException(url.toInetSocketAddress(), null, throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
} }
...@@ -121,12 +121,12 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -121,12 +121,12 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
if (handler instanceof WrappedChannelHandler ){ if (handler instanceof WrappedChannelHandler ){
executor = ((WrappedChannelHandler)handler).getExecutor(); executor = ((WrappedChannelHandler)handler).getExecutor();
} }
} }
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){ protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){
url = url.addParameterIfAbsent(Constants.THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME) url = url.addParameterIfAbsent(Constants.THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); .addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url); return ChannelHandlers.wrap(handler, url);
} }
/** /**
...@@ -136,56 +136,56 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -136,56 +136,56 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
//reconnect=false to close reconnect //reconnect=false to close reconnect
int reconnect = getReconnectParam(getUrl()); int reconnect = getReconnectParam(getUrl());
if(reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())){ if(reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())){
Runnable connectStatusCheckCommand = new Runnable() { Runnable connectStatusCheckCommand = new Runnable() {
public void run() { public void run() {
try { try {
if (! isConnected()) { if (! isConnected()) {
connect(); connect();
} else { } else {
lastConnectedTime = System.currentTimeMillis(); lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
String errorMsg = "client reconnect to "+getUrl().getAddress()+" find error . url: "+ getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout){
if (!reconnect_error_log_flag.get()){
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return ;
}
}
if ( reconnect_count.getAndIncrement() % reconnect_warning_period == 0){
logger.warn(errorMsg, t);
} }
} catch (Throwable t) {
String errorMsg = "client reconnect to "+getUrl().getAddress()+" find error . url: "+ getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout){
if (!reconnect_error_log_flag.get()){
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return ;
}
}
if ( reconnect_count.getAndIncrement() % reconnect_warning_period == 0){
logger.warn(errorMsg, t);
}
} }
} }
}; };
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
} }
} }
/** /**
* @param url * @param url
* @return 0-false * @return 0-false
*/ */
private static int getReconnectParam(URL url){ private static int getReconnectParam(URL url){
int reconnect ; int reconnect ;
String param = url.getParameter(Constants.RECONNECT_KEY); String param = url.getParameter(Constants.RECONNECT_KEY);
if (param == null || param.length()==0 || "true".equalsIgnoreCase(param)){ if (param == null || param.length()==0 || "true".equalsIgnoreCase(param)){
reconnect = Constants.DEFAULT_RECONNECT_PERIOD; reconnect = Constants.DEFAULT_RECONNECT_PERIOD;
}else if ("false".equalsIgnoreCase(param)){ }else if ("false".equalsIgnoreCase(param)){
reconnect = 0; reconnect = 0;
} else { } else {
try{ try{
reconnect = Integer.parseInt(param); reconnect = Integer.parseInt(param);
}catch (Exception e) { }catch (Exception e) {
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param); throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param);
} }
if(reconnect < 0){ if(reconnect < 0){
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param); throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:"+param);
} }
} }
return reconnect; return reconnect;
} }
private synchronized void destroyConnectStatusCheckCommand(){ private synchronized void destroyConnectStatusCheckCommand(){
...@@ -210,7 +210,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -210,7 +210,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
Channel channel = getChannel(); Channel channel = getChannel();
if (channel == null) if (channel == null)
return getUrl().toInetSocketAddress(); return getUrl().toInetSocketAddress();
return channel.getRemoteAddress(); return channel.getRemoteAddress();
} }
...@@ -256,11 +256,11 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -256,11 +256,11 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
return channel.hasAttribute(key); return channel.hasAttribute(key);
} }
public void send(Object message, boolean sent) throws RemotingException { public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()){ if (send_reconnect && !isConnected()){
connect(); connect();
} }
Channel channel = getChannel(); Channel channel = getChannel();
//TODO getChannel返回的状态是否包含null需要改进 //TODO getChannel返回的状态是否包含null需要改进
if (channel == null || ! channel.isConnected()) { if (channel == null || ! channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
...@@ -268,7 +268,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -268,7 +268,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
channel.send(message, sent); channel.send(message, sent);
} }
private void connect() throws RemotingException { protected void connect() throws RemotingException {
connectLock.lock(); connectLock.lock();
try { try {
if (isConnected()) { if (isConnected()) {
...@@ -280,14 +280,14 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -280,14 +280,14 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms."); + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else { } else {
if (logger.isInfoEnabled()){ if (logger.isInfoEnabled()){
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel()); + ", channel is " + this.getChannel());
} }
} }
reconnect_count.set(0); reconnect_count.set(0);
reconnect_error_log_flag.set(false); reconnect_error_log_flag.set(false);
} catch (RemotingException e) { } catch (RemotingException e) {
throw e; throw e;
...@@ -295,9 +295,9 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client ...@@ -295,9 +295,9 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e); + ", cause: " + e.getMessage(), e);
} finally { } finally {
connectLock.unlock(); connectLock.unlock();
} }
} }
public void disconnect() { public void disconnect() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册