Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
thythm
ribbon
提交
bccaafdd
R
ribbon
项目概览
thythm
/
ribbon
与 Fork 源项目一致
从无法访问的项目Fork
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
ribbon
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
bccaafdd
编写于
4月 24, 2014
作者:
A
Allen Wang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
javadoc updates. minor clean up.
上级
144c6df9
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
110 addition
and
33 deletion
+110
-33
ribbon-loadbalancer/src/main/java/com/netflix/client/AbstractLoadBalancerAwareClient.java
...a/com/netflix/client/AbstractLoadBalancerAwareClient.java
+2
-15
ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/ClientCallableProvider.java
...java/com/netflix/loadbalancer/ClientCallableProvider.java
+23
-1
ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerContext.java
...in/java/com/netflix/loadbalancer/LoadBalancerContext.java
+1
-1
ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerExecutor.java
...n/java/com/netflix/loadbalancer/LoadBalancerExecutor.java
+84
-16
未找到文件。
ribbon-loadbalancer/src/main/java/com/netflix/client/AbstractLoadBalancerAwareClient.java
浏览文件 @
bccaafdd
...
...
@@ -17,31 +17,20 @@
*/
package
com.netflix.client
;
import
java.net.URI
;
import
java.util.concurrent.Callable
;
import
java.util.concurrent.TimeUnit
;
import
static
com
.
netflix
.
loadbalancer
.
LoadBalancerExecutor
.
CallableToObservable
.
toObsevableProvider
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.net.URI
;
import
com.google.common.base.Preconditions
;
import
com.netflix.client.config.CommonClientConfigKey
;
import
com.netflix.client.config.IClientConfig
;
import
com.netflix.loadbalancer.AbstractLoadBalancer
;
import
com.netflix.loadbalancer.AvailabilityFilteringRule
;
import
com.netflix.loadbalancer.ClientCallableProvider
;
import
com.netflix.loadbalancer.ILoadBalancer
;
import
com.netflix.loadbalancer.LoadBalancerExecutor
;
import
com.netflix.loadbalancer.LoadBalancerStats
;
import
com.netflix.loadbalancer.Server
;
import
com.netflix.loadbalancer.ServerStats
;
import
com.netflix.servo.monitor.Monitors
;
import
com.netflix.servo.monitor.Stopwatch
;
import
com.netflix.servo.monitor.Timer
;
import
com.netflix.utils.RxUtils
;
import
static
com
.
netflix
.
loadbalancer
.
LoadBalancerExecutor
.
CallableToObservable
.
toObsevableProvider
;
/**
* Abstract class that provides the integration of client with load balancers.
*
...
...
@@ -51,8 +40,6 @@ import static com.netflix.loadbalancer.LoadBalancerExecutor.CallableToObservable
public
abstract
class
AbstractLoadBalancerAwareClient
<
S
extends
ClientRequest
,
T
extends
IResponse
>
extends
LoadBalancerExecutor
implements
IClient
<
S
,
T
>,
IClientConfigAware
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
AbstractLoadBalancerAwareClient
.
class
);
public
AbstractLoadBalancerAwareClient
(
ILoadBalancer
lb
)
{
super
(
lb
);
}
...
...
ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/ClientCallableProvider.java
浏览文件 @
bccaafdd
/*
*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
com.netflix.loadbalancer
;
/**
* An interface that provides API to be used by {@link LoadBalancerExecutor} to execute a task on a server.
*
* @author awang
*
*/
public
interface
ClientCallableProvider
<
T
>
{
public
T
executeOnServer
(
Server
server
)
throws
Exception
;
...
...
ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerContext.java
浏览文件 @
bccaafdd
...
...
@@ -118,7 +118,7 @@ public class LoadBalancerContext implements IClientConfigAware {
if
(
tracer
==
null
)
{
synchronized
(
this
)
{
if
(
tracer
==
null
)
{
tracer
=
Monitors
.
newTimer
(
clientName
+
"_
Opera
tionTimer"
,
TimeUnit
.
MILLISECONDS
);
tracer
=
Monitors
.
newTimer
(
clientName
+
"_
LoadBalancerExecu
tionTimer"
,
TimeUnit
.
MILLISECONDS
);
}
}
}
...
...
ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerExecutor.java
浏览文件 @
bccaafdd
...
...
@@ -25,6 +25,14 @@ import com.netflix.client.config.IClientConfig;
import
com.netflix.servo.monitor.Stopwatch
;
import
com.netflix.utils.RxUtils
;
/**
* Provides APIs to execute and retry tasks on a server chosen by the associated load balancer.
* With appropriate {@link RetryHandler}, it will also retry on one or more different servers.
*
*
* @author awang
*
*/
public
class
LoadBalancerExecutor
extends
LoadBalancerContext
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
LoadBalancerExecutor
.
class
);
...
...
@@ -91,10 +99,9 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
int
maxRetries
=
callErrorHandler
.
getMaxRetriesOnSameServer
();
boolean
shouldRetry
=
maxRetries
>
0
&&
callErrorHandler
.
isRetriableException
(
error
,
true
);
final
Throwable
finalThrowable
;
// URI uri = request.getUri();
if
(
shouldRetry
&&
!
handleSameServerRetry
(
server
,
counter
.
incrementAndGet
(),
maxRetries
,
error
))
{
finalThrowable
=
new
ClientException
(
ClientException
.
ErrorType
.
NUMBEROF_RETRIES_EXEEDED
,
"N
UMBEROFRETRIESEXEEDED:
"
+
maxRetries
+
" retries, while making a call for: "
+
server
,
error
);
"N
umber of retries exceeded max
"
+
maxRetries
+
" retries, while making a call for: "
+
server
,
error
);
shouldRetry
=
false
;
}
else
{
finalThrowable
=
error
;
...
...
@@ -122,7 +129,7 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
@Override
public
Observable
<
T
>
call
(
Throwable
t1
)
{
logger
.
debug
(
"Get error during retry on next server"
,
t1
);
logger
.
debug
(
"Get error
{}
during retry on next server"
,
t1
);
int
maxRetriesNextServer
=
callErrorHandler
.
getMaxRetriesOnNextServer
();
boolean
shouldRetry
=
maxRetriesNextServer
>
0
&&
callErrorHandler
.
isRetriableException
(
t1
,
false
);
final
Throwable
finalThrowable
;
...
...
@@ -147,9 +154,39 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
}
/**
* Execute a task on a server chosen by load balancer with possible retries. If there are any errors that are indicated as
* retriable by the {@link RetryHandler}, they will be consumed internally. If number of retries has
* exceeds the maximal allowed, a final error will be thrown. Otherwise, the first successful
* result during execution and retries will be returned.
*
* @param clientCallableProvider interface that provides the logic to execute network call synchronously with a given {@link Server}
* @throws Exception If any exception happens in the exception
*/
public
<
T
>
T
executeWithLoadBalancer
(
final
ClientCallableProvider
<
T
>
clientCallableProvider
,
RetryHandler
retryHandler
)
throws
Exception
{
return
executeWithLoadBalancer
(
clientCallableProvider
,
null
,
retryHandler
,
null
);
}
/**
* Retry execution with load balancer with the given {@link ClientCallableProvider} that provides the logic to
* execute network call synchronously with a given {@link Server}.
* Execute a task on a server chosen by load balancer with possible retries. If there are any errors that are indicated as
* retriable by the {@link RetryHandler}, they will be consumed internally. If number of retries has
* exceeds the maximal allowed, a final error will be thrown. Otherwise, the first successful
* result during execution and retries will be returned.
*
* @param clientCallableProvider interface that provides the logic to execute network call synchronously with a given {@link Server}
* @throws Exception If any exception happens in the exception
*/
public
<
T
>
T
executeWithLoadBalancer
(
final
ClientCallableProvider
<
T
>
clientCallableProvider
)
throws
Exception
{
return
executeWithLoadBalancer
(
clientCallableProvider
,
null
,
null
,
null
);
}
/**
* Execute a task on a server chosen by load balancer with possible retries. If there are any errors that are indicated as
* retriable by the {@link RetryHandler}, they will be consumed internally. If number of retries has
* exceeds the maximal allowed, a final error will be thrown. Otherwise, the first successful
* result during execution and retries will be returned.
*
* @param clientCallableProvider interface that provides the logic to execute network call synchronously with a given {@link Server}
* @param loadBalancerURI An optional URI that may contain a real host name and port to use as a fallback to the {@link LoadBalancerExecutor}
...
...
@@ -161,29 +198,48 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
* @param loadBalancerKey An optional key passed to the load balancer to determine which server to return.
* @throws Exception If any exception happens in the exception
*/
p
ublic
<
T
>
T
executeWithLoadBalancer
(
final
ClientCallableProvider
<
T
>
clientCallableProvider
,
@Nullable
final
URI
loadBalancerURI
,
p
rotected
<
T
>
T
executeWithLoadBalancer
(
final
ClientCallableProvider
<
T
>
clientCallableProvider
,
@Nullable
final
URI
loadBalancerURI
,
@Nullable
final
RetryHandler
retryHandler
,
@Nullable
final
Object
loadBalancerKey
)
throws
Exception
{
return
RxUtils
.
getSingleValueWithRealErrorCause
(
executeWithLoadBalancer
(
CallableToObservable
.
toObsevableProvider
(
clientCallableProvider
),
loadBalancerURI
,
retryHandler
,
loadBalancerKey
));
}
public
<
T
>
Observable
<
T
>
executeWithLoadBalancer
(
final
ClientObservableProvider
<
T
>
clientObservableProvider
,
@Nullable
final
RetryHandler
retryHandler
,
@Nullable
final
Object
loadBalancerKey
)
{
return
executeWithLoadBalancer
(
clientObservableProvider
,
null
,
retryHandler
,
loadBalancerKey
);
}
/**
* Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
* If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
* function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
* exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
* result during execution and retries will be emitted.
*
* @param clientObservableProvider interface that provides the logic to execute network call asynchronously with a given {@link Server}
* @param retryHandler an optional handler to determine the retry logic of the {@link LoadBalancerExecutor}. If null, the default {@link RetryHandler}
* of this {@link LoadBalancerExecutor} will be used.
*/
public
<
T
>
Observable
<
T
>
executeWithLoadBalancer
(
final
ClientObservableProvider
<
T
>
clientObservableProvider
,
@Nullable
final
RetryHandler
retryHandler
)
{
return
executeWithLoadBalancer
(
clientObservableProvider
,
null
,
retryHandler
,
null
);
}
/**
* Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
* If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
* function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
* exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
* result during execution and retries will be emitted.
*
* @param clientObservableProvider interface that provides the logic to execute network call synchronously with a given {@link Server}
*/
public
<
T
>
Observable
<
T
>
executeWithLoadBalancer
(
final
ClientObservableProvider
<
T
>
clientObservableProvider
)
{
return
executeWithLoadBalancer
(
clientObservableProvider
,
null
,
new
DefaultLoadBalancerRetryHandler
(),
null
);
}
/**
* Create an {@link Observable} that retries execution with load balancer with the given {@link ClientObservableProvider} that provides the logic to
* execute network call asynchronously with a given {@link Server}.
* Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
* If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
* function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
* exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
* result during execution and retries will be emitted.
*
* @param clientObservableProvider interface that provides the logic to execute network call asynchronously with a given {@link Server}
* @param loadBalancerURI An optional URI that may contain a real host name and port to be used by {@link LoadBalancerExecutor}
...
...
@@ -215,13 +271,24 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
return
observable
.
onErrorResumeNext
(
retryNextServerFunc
);
}
/**
* Gets the {@link Observable} that represents the result of executing on a server, after possible retries as dictated by
* {@link RetryHandler}. During retry, any errors that are retriable are consumed by the function and will not be observed
* by the external {@link Observer}. If number of retries exceeds the maximal retries allowed on one server, a final error will
* be emitted by the returned {@link Observable}.
*/
protected
<
T
>
Observable
<
T
>
retrySameServer
(
final
Server
server
,
final
ClientObservableProvider
<
T
>
clientObservableProvider
,
final
RetryHandler
errorHandler
)
{
final
ServerStats
serverStats
=
getServerStats
(
server
);
OnSubscribeFunc
<
T
>
onSubscribe
=
new
OnSubscribeFunc
<
T
>()
{
@Override
public
Subscription
onSubscribe
(
final
Observer
<?
super
T
>
t1
)
{
final
ServerStats
serverStats
=
getServerStats
(
server
);
noteOpenConnection
(
serverStats
);
final
Stopwatch
tracer
=
getExecuteTracer
().
start
();
/*
* A delegate Observer that observes the execution result
* and records load balancer related statistics before
* sending the same result to the external Observer
*/
Observer
<
T
>
delegate
=
new
Observer
<
T
>()
{
private
volatile
T
entity
;
@Override
...
...
@@ -232,14 +299,15 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
@Override
public
void
onError
(
Throwable
e
)
{
logger
.
debug
(
"Got error {} when executed on server {}"
,
e
,
server
);
recordStats
(
entity
,
e
);
t1
.
onError
(
e
);
}
@Override
public
void
onNext
(
T
args
)
{
entity
=
args
;
t1
.
onNext
(
args
);
public
void
onNext
(
T
obj
)
{
entity
=
obj
;
t1
.
onNext
(
obj
);
}
private
void
recordStats
(
Object
entity
,
Throwable
exception
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录