Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
ad47c904
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
ad47c904
编写于
6月 22, 2020
作者:
E
Evan
提交者:
GitHub
6月 22, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update the kubernetes client verion to 8.0.0 (#4844)
上级
08897ce5
变更
20
展开全部
隐藏空白更改
内联
并排
Showing
20 changed file
with
453 addition
and
716 deletion
+453
-716
dist-material/release-docs/LICENSE
dist-material/release-docs/LICENSE
+13
-4
oap-server/pom.xml
oap-server/pom.xml
+1
-1
oap-server/server-bootstrap/src/main/resources/application.yml
...erver/server-bootstrap/src/main/resources/application.yml
+0
-1
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
...ster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
+4
-33
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
...er/plugin/kubernetes/ClusterModuleKubernetesProvider.java
+5
-7
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
...kywalking/oap/server/cluster/plugin/kubernetes/Event.java
+0
-46
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
...rver/cluster/plugin/kubernetes/KubernetesCoordinator.java
+42
-94
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
.../cluster/plugin/kubernetes/NamespacedPodListInformer.java
+106
-0
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
...g/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
+0
-32
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/UidEnvSupplier.java
.../oap/server/cluster/plugin/kubernetes/UidEnvSupplier.java
+1
-1
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
...lugin/kubernetes/dependencies/NamespacedPodListWatch.java
+0
-114
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
...lugin/kubernetes/ClusterModuleKubernetesProviderTest.java
+28
-23
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
.../cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+69
-115
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
.../server/cluster/plugin/kubernetes/fixture/PlainWatch.java
+0
-89
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java
...ing/oap/server/receiver/envoy/als/DependencyResource.java
+3
-4
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java
...che/skywalking/oap/server/receiver/envoy/als/Fetcher.java
+5
-6
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
...ver/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
+124
-93
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java
...oap/server/receiver/envoy/als/DependencyResourceTest.java
+6
-7
tools/dependencies/known-oap-backend-dependencies-es7.txt
tools/dependencies/known-oap-backend-dependencies-es7.txt
+22
-22
tools/dependencies/known-oap-backend-dependencies.txt
tools/dependencies/known-oap-backend-dependencies.txt
+24
-24
未找到文件。
dist-material/release-docs/LICENSE
浏览文件 @
ad47c904
...
...
@@ -267,7 +267,8 @@ The text of each license is the standard Apache 2.0 license.
Apache: commons-collections 3.2.2: https://github.com/apache/commons-collections, Apache 2.0
Apache: commons-configuration 1.8: https://github.com/apache/commons-configuration, Apache 2.0
Apache: commons-io 2.4: https://github.com/apache/commons-io, Apache 2.0
Apache: commons-compress 1.18: https://github.com/apache/commons-compress, Apache 2.0
Apache: commons-compress 1.19: https://github.com/apache/commons-compress, Apache 2.0
Apache: commons-collections4 4.1: https://mvnrepository.com/artifact/org.apache.commons/commons-collections4, Apache 2.0
Apache: tomcat 8.5.27: https://github.com/apache/tomcat/tree/trunk, Apache 2.0
Apache: freemarker 2.3.28: https://github.com/apache/freemarker, Apache 2.0
netty 5.5.0: https://github.com/netty/netty/blob/4.1/LICENSE.txt, Apache 2.0
...
...
@@ -304,7 +305,7 @@ The text of each license is the standard Apache 2.0 license.
HikariCP 3.1.0: https://github.com/brettwooldridge/HikariCP, Apache 2.0
zipkin 2.9.1: https://github.com/openzipkin/zipkin, Apache 2.0
sharding-jdbc-core 2.0.3: https://github.com/sharding-sphere/sharding-sphere, Apache 2.0
kubernetes-client
4
.0.0: https://github.com/kubernetes-client/java, Apache 2.0
kubernetes-client
8
.0.0: https://github.com/kubernetes-client/java, Apache 2.0
proto files from istio/istio: https://github.com/istio/istio Apache 2.0
proto files from istio/api: https://github.com/istio/api Apache 2.0
consul-client 1.2.6: https://github.com/rickfast/consul-client, Apache 2.0
...
...
@@ -327,6 +328,12 @@ The text of each license is the standard Apache 2.0 license.
moshi 1.5.0: https://github.com/square/moshi, Apache 2.0
logging-interceptor 3.13.1: https://github.com/square/okhttp/tree/master/okhttp-logging-interceptor, Apache 2.0
msgpack-core 0.8.16: https://github.com/msgpack/msgpack-java, Apache 2.0
sundr-codegen 0.2.10: https://mvnrepository.com/artifact/io.sundr/sundr-codegen, Apache 2.0
sundr-core 0.2.10: https://mvnrepository.com/artifact/io.sundr/sundr-core, Apache 2.0
swagger-annotations 1.5.22: https://mvnrepository.com/artifact/io.swagger.core.v3/swagger-annotations, Apache 2.0
resourcecify-annotations 0.21.0: https://mvnrepository.com/artifact/io.sundr/resourcecify-annotations, Apache 2.0
jose4j 0.7.0: https://mvnrepository.com/artifact/org.bitbucket.b_c/jose4j, Apache 2.0
converter-moshi 2.5.0: https://mvnrepository.com/artifact/com.squareup.retrofit2/converter-moshi, Apache 2.0
vavr 0.10.3: https://github.com/vavr-io/vavr, Apache 2.0
========================================================================
...
...
@@ -341,8 +348,9 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
GraphQL java 6.0: https://github.com/graphql-java/graphql-java , MIT
GraphQL Java Tools 4.3.0: https://github.com/graphql-java/graphql-java-tools , MIT
jopt-simple 5.0.2: https://github.com/jopt-simple/jopt-simple , MIT
bcpkix-jdk15on 1.55: http://www.bouncycastle.org/licence.html , MIT
bcprov-jdk15on 1.55: http://www.bouncycastle.org/licence.html , MIT
bcpkix-jdk15on 1.61: http://www.bouncycastle.org/licence.html , MIT
bcprov-jdk15on 1.61: http://www.bouncycastle.org/licence.html , MIT
bcprov-ext-jdk15on 1.61: http://www.bouncycastle.org/licence.html , MIT
minimal-json 0.9.5: https://github.com/ralfstx/minimal-json, MIT
checker-qual 2.8.1: https://github.com/typetools/checker-framework, MIT
influxdb-java 2.15: https://github.com/influxdata/influxdb-java, MIT
...
...
@@ -429,6 +437,7 @@ popper.js 1.14.7: https://github.com/FezVrasta/popper.js MIT
vue-datepicker-local 1.0.19: https://github.com/weifeiyue/vue-datepicker-local MIT
vue-js-modal 1.3.31: https://github.com/euvl/vue-js-modal MIT
lodash 4.17.15: https://github.com/lodash/lodash MIT
gson-fire 1.8.3: https://mvnrepository.com/artifact/io.gsonfire/gson-fire MIT
========================================
Apache 2.0 licenses
...
...
oap-server/pom.xml
浏览文件 @
ad47c904
...
...
@@ -65,7 +65,7 @@
<commons-io.version>
2.6
</commons-io.version>
<elasticsearch.version>
6.3.2
</elasticsearch.version>
<joda-time.version>
2.10.5
</joda-time.version>
<kubernetes.version>
4
.0.0
</kubernetes.version>
<kubernetes.version>
8
.0.0
</kubernetes.version>
<hikaricp.version>
3.1.0
</hikaricp.version>
<zipkin.version>
2.9.1
</zipkin.version>
<caffeine.version>
2.6.2
</caffeine.version>
...
...
oap-server/server-bootstrap/src/main/resources/application.yml
浏览文件 @
ad47c904
...
...
@@ -29,7 +29,6 @@ cluster:
schema
:
${SW_ZK_SCHEMA:digest}
# only support digest schema
expression
:
${SW_ZK_EXPRESSION:skywalking:skywalking}
kubernetes
:
watchTimeoutSeconds
:
${SW_CLUSTER_K8S_WATCH_TIMEOUT:60}
namespace
:
${SW_CLUSTER_K8S_NAMESPACE:default}
labelSelector
:
${SW_CLUSTER_K8S_LABEL:app=collector,release=skywalking}
uidEnvName
:
${SW_CLUSTER_K8S_UID:SKYWALKING_COLLECTOR_UID}
...
...
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesConfig.java
浏览文件 @
ad47c904
...
...
@@ -18,46 +18,17 @@
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
import
lombok.Getter
;
import
lombok.Setter
;
import
org.apache.skywalking.oap.server.library.module.ModuleConfig
;
/**
* The configuration of the module of cluster.kubernetes
*/
@Getter
@Setter
public
class
ClusterModuleKubernetesConfig
extends
ModuleConfig
{
private
int
watchTimeoutSeconds
;
private
String
namespace
;
private
String
labelSelector
;
private
String
uidEnvName
;
public
int
getWatchTimeoutSeconds
()
{
return
watchTimeoutSeconds
;
}
public
void
setWatchTimeoutSeconds
(
int
watchTimeoutSeconds
)
{
this
.
watchTimeoutSeconds
=
watchTimeoutSeconds
;
}
public
String
getNamespace
()
{
return
namespace
;
}
public
void
setNamespace
(
String
namespace
)
{
this
.
namespace
=
namespace
;
}
public
String
getLabelSelector
()
{
return
labelSelector
;
}
public
void
setLabelSelector
(
String
labelSelector
)
{
this
.
labelSelector
=
labelSelector
;
}
public
String
getUidEnvName
()
{
return
uidEnvName
;
}
public
void
setUidEnvName
(
String
uidEnvName
)
{
this
.
uidEnvName
=
uidEnvName
;
}
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
浏览文件 @
ad47c904
...
...
@@ -18,8 +18,7 @@
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.NamespacedPodListWatch
;
import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies.UidEnvSupplier
;
import
org.apache.skywalking.oap.server.core.CoreModule
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterModule
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterRegister
;
...
...
@@ -58,24 +57,23 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider {
@Override
public
void
prepare
()
throws
ServiceNotProvidedException
{
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
new
NamespacedPodListWatch
(
config
.
getNamespace
(),
config
.
getLabelSelector
(),
config
.
getWatchTimeoutSeconds
()),
new
UidEnvSupplier
(
config
.
getUidEnvName
())
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
config
);
this
.
registerServiceImplementation
(
ClusterRegister
.
class
,
coordinator
);
this
.
registerServiceImplementation
(
ClusterNodesQuery
.
class
,
coordinator
);
}
@Override
public
void
start
()
{
NamespacedPodListInformer
.
INFORMER
.
init
(
config
);
}
@Override
public
void
notifyAfterCompleted
()
{
coordinator
.
start
();
}
@Override
public
String
[]
requiredModules
()
{
return
new
String
[
0
]
;
return
new
String
[
]
{
CoreModule
.
NAME
}
;
}
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/Event.java
已删除
100644 → 0
浏览文件 @
08897ce5
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
/**
* The event of watch.
*/
public
class
Event
{
private
final
String
type
;
private
final
String
uid
;
private
final
String
host
;
public
Event
(
final
String
type
,
final
String
uid
,
final
String
host
)
{
this
.
type
=
type
;
this
.
uid
=
uid
;
this
.
host
=
host
;
}
String
getType
()
{
return
type
;
}
String
getUid
()
{
return
uid
;
}
String
getHost
()
{
return
host
;
}
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
浏览文件 @
ad47c904
...
...
@@ -18,21 +18,13 @@
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
import
com.google.common.util.concurrent.FutureCallback
;
import
com.google.common.util.concurrent.Futures
;
import
com.google.common.util.concurrent.ListenableFuture
;
import
com.google.common.util.concurrent.ListeningExecutorService
;
import
com.google.common.util.concurrent.MoreExecutors
;
import
com.google.common.util.concurrent.ThreadFactoryBuilder
;
import
java.util.ArrayList
;
import
io.kubernetes.client.openapi.models.V1ObjectMeta
;
import
io.kubernetes.client.openapi.models.V1Pod
;
import
io.kubernetes.client.openapi.models.V1PodStatus
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.Callable
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.function.Supplier
;
import
javax.annotation.Nullable
;
import
java.util.stream.Collectors
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.skywalking.oap.server.core.CoreModule
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterRegister
;
...
...
@@ -42,108 +34,64 @@ import org.apache.skywalking.oap.server.core.config.ConfigService;
import
org.apache.skywalking.oap.server.core.remote.client.Address
;
import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder
;
import
org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of
* {@link RemoteInstance}.
*/
@Slf4j
public
class
KubernetesCoordinator
implements
ClusterRegister
,
ClusterNodesQuery
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
KubernetesCoordinator
.
class
);
private
final
ModuleDefineHolder
manager
;
private
volatile
int
port
=
-
1
;
private
final
String
uid
;
private
final
Map
<
String
,
RemoteInstance
>
cache
=
new
ConcurrentHashMap
<>();
public
KubernetesCoordinator
(
final
ModuleDefineHolder
manager
,
final
ClusterModuleKubernetesConfig
config
)
{
this
.
uid
=
new
UidEnvSupplier
(
config
.
getUidEnvName
()).
get
();
this
.
manager
=
manager
;
}
private
final
ReusableWatch
<
Event
>
watch
;
@Override
public
List
<
RemoteInstance
>
queryRemoteNodes
()
{
private
volatile
int
port
=
-
1
;
List
<
V1Pod
>
pods
=
NamespacedPodListInformer
.
INFORMER
.
listPods
().
orElseGet
(
this
::
selfPod
)
;
KubernetesCoordinator
(
ModuleDefineHolder
manager
,
final
ReusableWatch
<
Event
>
watch
,
final
Supplier
<
String
>
uidSupplier
)
{
this
.
manager
=
manager
;
this
.
watch
=
watch
;
this
.
uid
=
uidSupplier
.
get
();
TelemetryRelatedContext
.
INSTANCE
.
setId
(
uid
);
}
if
(
log
.
isDebugEnabled
())
{
List
<
String
>
uidList
=
pods
.
stream
()
.
map
(
item
->
item
.
getMetadata
().
getUid
())
.
collect
(
Collectors
.
toList
());
log
.
debug
(
"[kubernetes cluster pods uid list]:{}"
,
uidList
.
toString
());
}
if
(
port
==
-
1
)
{
port
=
manager
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
ConfigService
.
class
).
getGRPCPort
();
}
return
pods
.
stream
()
.
map
(
pod
->
new
RemoteInstance
(
new
Address
(
pod
.
getStatus
().
getPodIP
(),
port
,
pod
.
getMetadata
().
getUid
().
equals
(
uid
))))
.
collect
(
Collectors
.
toList
());
public
void
start
()
{
ExecutorService
executorService
=
Executors
.
newSingleThreadExecutor
(
new
ThreadFactoryBuilder
().
setDaemon
(
true
)
.
setNameFormat
(
"Kubernetes-ApiServer-%s"
)
.
build
());
submitTask
(
MoreExecutors
.
listeningDecorator
(
executorService
),
executorService
);
}
@Override
public
void
registerRemote
(
RemoteInstance
remoteInstance
)
throws
ServiceRegisterException
{
public
void
registerRemote
(
final
RemoteInstance
remoteInstance
)
throws
ServiceRegisterException
{
this
.
port
=
remoteInstance
.
getAddress
().
getPort
();
TelemetryRelatedContext
.
INSTANCE
.
setId
(
remoteInstance
.
toString
());
}
private
void
submitTask
(
final
ListeningExecutorService
service
,
final
ExecutorService
executorService
)
{
watch
.
initOrReset
();
ListenableFuture
<?>
watchFuture
=
service
.
submit
(
newWatch
());
Futures
.
addCallback
(
watchFuture
,
new
FutureCallback
<
Object
>()
{
@Override
public
void
onSuccess
(
@Nullable
Object
ignored
)
{
submitTask
(
service
,
executorService
);
}
@Override
public
void
onFailure
(
@Nullable
Throwable
throwable
)
{
logger
.
debug
(
"Generate remote nodes error"
,
throwable
);
submitTask
(
service
,
executorService
);
}
},
executorService
);
}
private
Callable
<
Object
>
newWatch
()
{
return
()
->
{
generateRemoteNodes
();
return
null
;
};
}
private
List
<
V1Pod
>
selfPod
()
{
private
void
generateRemoteNodes
()
{
for
(
Event
event
:
watch
)
{
if
(
event
==
null
)
{
break
;
}
logger
.
debug
(
"Received event {} {}-{}"
,
event
.
getType
(),
event
.
getUid
(),
event
.
getHost
());
switch
(
event
.
getType
())
{
case
"ADDED"
:
case
"MODIFIED"
:
cache
.
put
(
event
.
getUid
(),
new
RemoteInstance
(
new
Address
(
event
.
getHost
(),
port
,
event
.
getUid
()
.
equals
(
this
.
uid
))));
break
;
case
"DELETED"
:
cache
.
remove
(
event
.
getUid
());
break
;
default
:
throw
new
RuntimeException
(
String
.
format
(
"Unknown event %s"
,
event
.
getType
()));
}
}
}
V1Pod
v1Pod
=
new
V1Pod
();
v1Pod
.
setMetadata
(
new
V1ObjectMeta
());
v1Pod
.
setStatus
(
new
V1PodStatus
());
v1Pod
.
getMetadata
().
setUid
(
uid
);
v1Pod
.
getStatus
().
setPodIP
(
"127.0.0.1"
);
return
Collections
.
singletonList
(
v1Pod
);
@Override
public
List
<
RemoteInstance
>
queryRemoteNodes
()
{
final
List
<
RemoteInstance
>
list
=
new
ArrayList
<>();
cache
.
values
().
forEach
(
instance
->
{
Address
address
=
instance
.
getAddress
();
if
(
port
==
-
1
)
{
logger
.
debug
(
"Query kubernetes remote, port hasn't init, try to init"
);
ConfigService
service
=
manager
.
find
(
CoreModule
.
NAME
).
provider
().
getService
(
ConfigService
.
class
);
port
=
service
.
getGRPCPort
();
logger
.
debug
(
"Query kubernetes remote, port is set at {}"
,
port
);
}
list
.
add
(
new
RemoteInstance
(
new
Address
(
address
.
getHost
(),
port
,
address
.
isSelf
())));
});
logger
.
debug
(
"Query kubernetes remote nodes: {}"
,
list
);
return
list
;
}
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
0 → 100644
浏览文件 @
ad47c904
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
import
io.kubernetes.client.informer.SharedIndexInformer
;
import
io.kubernetes.client.informer.SharedInformerFactory
;
import
io.kubernetes.client.informer.cache.Lister
;
import
io.kubernetes.client.openapi.ApiClient
;
import
io.kubernetes.client.openapi.apis.CoreV1Api
;
import
io.kubernetes.client.openapi.models.V1Pod
;
import
io.kubernetes.client.openapi.models.V1PodList
;
import
io.kubernetes.client.util.Config
;
import
java.io.IOException
;
import
java.util.List
;
import
java.util.Objects
;
import
java.util.Optional
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
java.util.stream.Collectors
;
import
lombok.extern.slf4j.Slf4j
;
@Slf4j
public
enum
NamespacedPodListInformer
{
/**
* contains remote collector instances
*/
INFORMER
;
private
Lister
<
V1Pod
>
podLister
;
private
SharedInformerFactory
factory
;
private
final
ExecutorService
executorService
=
Executors
.
newSingleThreadExecutor
(
r
->
{
Thread
thread
=
new
Thread
(
r
,
"SKYWALKING_KUBERNETES_CLUSTER_INFORMER"
);
thread
.
setDaemon
(
true
);
return
thread
;
});
{
Runtime
.
getRuntime
().
addShutdownHook
(
new
Thread
(()
->
{
if
(
Objects
.
nonNull
(
factory
))
{
factory
.
stopAllRegisteredInformers
();
}
}));
}
public
synchronized
void
init
(
ClusterModuleKubernetesConfig
podConfig
)
{
try
{
doStartPodInformer
(
podConfig
);
}
catch
(
IOException
e
)
{
log
.
error
(
"cannot connect with api server in kubernetes"
,
e
);
}
}
private
void
doStartPodInformer
(
ClusterModuleKubernetesConfig
podConfig
)
throws
IOException
{
ApiClient
apiClient
=
Config
.
defaultClient
();
apiClient
.
setHttpClient
(
apiClient
.
getHttpClient
().
newBuilder
().
readTimeout
(
0
,
TimeUnit
.
SECONDS
).
build
());
CoreV1Api
coreV1Api
=
new
CoreV1Api
(
apiClient
);
factory
=
new
SharedInformerFactory
(
executorService
);
SharedIndexInformer
<
V1Pod
>
podSharedIndexInformer
=
factory
.
sharedIndexInformerFor
(
params
->
coreV1Api
.
listNamespacedPodCall
(
podConfig
.
getNamespace
(),
null
,
null
,
null
,
null
,
podConfig
.
getLabelSelector
(),
Integer
.
MAX_VALUE
,
params
.
resourceVersion
,
params
.
timeoutSeconds
,
params
.
watch
,
null
),
V1Pod
.
class
,
V1PodList
.
class
);
factory
.
startAllRegisteredInformers
();
podLister
=
new
Lister
<>(
podSharedIndexInformer
.
getIndexer
());
}
public
Optional
<
List
<
V1Pod
>>
listPods
()
{
return
Optional
.
ofNullable
(
podLister
.
list
().
size
()
!=
0
?
podLister
.
list
()
.
stream
()
.
filter
(
item
->
"Running"
.
equalsIgnoreCase
(
item
.
getStatus
().
getPhase
()))
.
collect
(
Collectors
.
toList
())
:
null
);
}
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ReusableWatch.java
已删除
100644 → 0
浏览文件 @
08897ce5
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
/**
* This watch can init or reset internal state.
*
* @param <T> event of watch
*/
public
interface
ReusableWatch
<
T
>
extends
Iterable
<
T
>
{
/**
* Reset internal state.
*/
void
initOrReset
();
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/
dependencies/
UidEnvSupplier.java
→
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/UidEnvSupplier.java
浏览文件 @
ad47c904
...
...
@@ -16,7 +16,7 @@
*
*/
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
.dependencies
;
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
import
java.util.function.Supplier
;
...
...
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/dependencies/NamespacedPodListWatch.java
已删除
100644 → 0
浏览文件 @
08897ce5
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.dependencies
;
import
com.google.common.reflect.TypeToken
;
import
io.kubernetes.client.ApiClient
;
import
io.kubernetes.client.ApiException
;
import
io.kubernetes.client.Configuration
;
import
io.kubernetes.client.apis.CoreV1Api
;
import
io.kubernetes.client.models.V1Pod
;
import
io.kubernetes.client.util.Config
;
import
io.kubernetes.client.util.Watch
;
import
java.io.IOException
;
import
java.util.Iterator
;
import
java.util.Objects
;
import
java.util.concurrent.TimeUnit
;
import
java.util.function.Supplier
;
import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event
;
import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* Watch the api {@literal https://v1-9.docs.kubernetes.io/docs/reference/generated/kubernetes-api/v1.9/#watch-64}.
*/
public
class
NamespacedPodListWatch
implements
ReusableWatch
<
Event
>
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NamespacedPodListWatch
.
class
);
private
final
String
namespace
;
private
final
String
labelSelector
;
private
final
int
watchTimeoutSeconds
;
private
Watch
<
V1Pod
>
watch
;
public
NamespacedPodListWatch
(
final
String
namespace
,
final
String
labelSelector
,
final
int
watchTimeoutSeconds
)
{
this
.
namespace
=
namespace
;
this
.
labelSelector
=
labelSelector
;
this
.
watchTimeoutSeconds
=
watchTimeoutSeconds
;
}
@Override
public
void
initOrReset
()
{
ApiClient
client
;
try
{
client
=
Config
.
defaultClient
();
}
catch
(
IOException
e
)
{
throw
new
RuntimeException
(
e
.
getMessage
(),
e
);
}
client
.
getHttpClient
().
setReadTimeout
(
watchTimeoutSeconds
,
TimeUnit
.
SECONDS
);
Configuration
.
setDefaultApiClient
(
client
);
CoreV1Api
api
=
new
CoreV1Api
();
try
{
watch
=
Watch
.
createWatch
(
client
,
api
.
listNamespacedPodCall
(
namespace
,
null
,
null
,
null
,
null
,
labelSelector
,
Integer
.
MAX_VALUE
,
null
,
null
,
Boolean
.
TRUE
,
null
,
null
),
new
TypeToken
<
Watch
.
Response
<
V1Pod
>>()
{
}.
getType
());
}
catch
(
final
ApiException
e
)
{
logger
.
error
(
"code:{} header:{} body:{}"
,
e
.
getCode
(),
e
.
getResponseHeaders
(),
e
.
getResponseBody
());
throw
new
RuntimeException
(
e
.
getMessage
(),
e
);
}
}
@Override
public
Iterator
<
Event
>
iterator
()
{
final
Iterator
<
Watch
.
Response
<
V1Pod
>>
watchItr
=
watch
.
iterator
();
return
new
Iterator
<
Event
>()
{
@Override
public
boolean
hasNext
()
{
return
wrap
(
watchItr:
:
hasNext
,
false
);
}
@Override
public
Event
next
()
{
return
wrap
(()
->
{
final
Watch
.
Response
<
V1Pod
>
response
=
watchItr
.
next
();
return
new
Event
(
response
.
type
,
response
.
object
.
getMetadata
().
getUid
(),
response
.
object
.
getStatus
()
.
getPodIP
());
},
null
);
}
private
<
R
>
R
wrap
(
final
Supplier
<
R
>
action
,
final
R
defaultValue
)
{
Objects
.
requireNonNull
(
action
);
try
{
return
action
.
get
();
}
catch
(
final
Throwable
t
)
{
logger
.
trace
(
"Throwable"
,
t
);
try
{
watch
.
close
();
}
catch
(
IOException
e
)
{
logger
.
error
(
"Close watch error"
,
e
);
}
}
return
defaultValue
;
}
};
}
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
浏览文件 @
ad47c904
...
...
@@ -18,48 +18,53 @@
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
import
org.apache.skywalking.oap.server.core.CoreModule
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterModule
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery
;
import
org.apache.skywalking.oap.server.core.cluster.ClusterRegister
;
import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException
;
import
org.junit.Before
;
import
org.apache.skywalking.oap.server.library.module.ModuleConfig
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.powermock.core.classloader.annotations.PowerMockIgnore
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
static
org
.
hamcrest
.
core
.
Is
.
is
;
import
static
org
.
junit
.
Assert
.
assertSame
;
import
static
org
.
junit
.
Assert
.
assertThat
;
import
static
org
.
junit
.
Assert
.
assertArrayEquals
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
@RunWith
(
PowerMockRunner
.
class
)
@PowerMockIgnore
(
"javax.management.*"
)
public
class
ClusterModuleKubernetesProviderTest
{
private
ClusterModuleKubernetesProvider
provider
;
private
ClusterModuleKubernetesProvider
provider
=
new
ClusterModuleKubernetesProvider
()
;
@
Before
public
void
setUp
()
{
provider
=
new
ClusterModuleKubernetesProvider
(
);
@
Test
public
void
name
()
{
assertEquals
(
"kubernetes"
,
provider
.
name
()
);
}
@Test
public
void
assertNam
e
()
{
assert
That
(
provider
.
name
(),
is
(
"kubernetes"
));
public
void
modul
e
()
{
assert
Equals
(
ClusterModule
.
class
,
provider
.
module
(
));
}
@Test
public
void
assertModule
()
{
assertTrue
(
provider
.
module
().
isAssignableFrom
(
ClusterModule
.
class
));
public
void
createConfigBeanIfAbsent
()
{
ModuleConfig
moduleConfig
=
provider
.
createConfigBeanIfAbsent
();
assertTrue
(
moduleConfig
instanceof
ClusterModuleKubernetesConfig
);
}
@Test
public
void
assertCreateConfigBeanIfAbsent
()
{
assertTrue
(
ClusterModuleKubernetesConfig
.
class
.
isInstance
(
provider
.
createConfigBeanIfAbsent
())
);
public
void
prepare
()
throws
Exception
{
provider
.
prepare
(
);
}
@Test
public
void
assertPrepare
()
throws
ServiceNotProvidedException
{
provider
.
prepare
();
ClusterRegister
register
=
provider
.
getService
(
ClusterRegister
.
class
);
ClusterNodesQuery
query
=
provider
.
getService
(
ClusterNodesQuery
.
class
);
assertSame
(
register
,
query
);
assertTrue
(
KubernetesCoordinator
.
class
.
isInstance
(
register
));
public
void
notifyAfterCompleted
()
{
provider
.
notifyAfterCompleted
();
}
@Test
public
void
requiredModules
()
{
String
[]
modules
=
provider
.
requiredModules
();
assertArrayEquals
(
new
String
[]
{
CoreModule
.
NAME
},
modules
);
}
}
\ No newline at end of file
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
浏览文件 @
ad47c904
...
...
@@ -18,150 +18,104 @@
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes
;
import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch
;
import
io.kubernetes.client.openapi.models.V1ObjectMeta
;
import
io.kubernetes.client.openapi.models.V1Pod
;
import
io.kubernetes.client.openapi.models.V1PodStatus
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Optional
;
import
java.util.stream.Collectors
;
import
org.apache.skywalking.oap.server.core.CoreModule
;
import
org.apache.skywalking.oap.server.core.CoreModuleConfig
;
import
org.apache.skywalking.oap.server.core.cluster.RemoteInstance
;
import
org.apache.skywalking.oap.server.core.config.ConfigService
;
import
org.apache.skywalking.oap.server.core.remote.client.Address
;
import
org.apache.skywalking.oap.server.library.module.ModuleDefineHolder
;
import
org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting
;
import
org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.mockito.Mockito
;
import
org.junit.runner.RunWith
;
import
org.powermock.api.mockito.PowerMockito
;
import
org.powermock.api.support.membermodification.MemberModifier
;
import
org.powermock.core.classloader.annotations.PowerMockIgnore
;
import
org.powermock.core.classloader.annotations.PrepareForTest
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.reflect.Whitebox
;
import
static
org
.
hamcrest
.
core
.
Is
.
is
;
import
static
org
.
junit
.
Assert
.
assertThat
;
import
static
org
.
mockito
.
Mockito
.
when
;
import
static
org
.
powermock
.
api
.
mockito
.
PowerMockito
.
when
;
@RunWith
(
PowerMockRunner
.
class
)
@PowerMockIgnore
(
"javax.management.*"
)
@PrepareForTest
({
NamespacedPodListInformer
.
class
})
public
class
KubernetesCoordinatorTest
{
private
KubernetesCoordinator
coordinator
;
@Test
public
void
assertAdded
()
throws
InterruptedException
{
PlainWatch
watch
=
PlainWatch
.
create
(
2
,
"ADDED"
,
"1"
,
"10.0.0.1"
,
"ADDED"
,
"2"
,
"10.0.0.2"
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
watch
,
()
->
"1"
);
coordinator
.
start
();
coordinator
.
registerRemote
(
new
RemoteInstance
(
new
Address
(
"0.0.0.0"
,
8454
,
true
)));
watch
.
await
();
assertThat
(
coordinator
.
queryRemoteNodes
().
size
(),
is
(
2
));
assertThat
(
coordinator
.
queryRemoteNodes
()
.
stream
()
.
filter
(
instance
->
instance
.
getAddress
().
isSelf
())
.
findFirst
()
.
get
()
.
getAddress
()
.
getHost
(),
is
(
"10.0.0.1"
));
}
public
static
final
String
LOCAL_HOST
=
"127.0.0.1"
;
public
static
final
Integer
GRPC_PORT
=
8454
;
public
static
final
Integer
SELF_UID
=
12345
;
@Test
public
void
assertModified
()
throws
InterruptedException
{
PlainWatch
watch
=
PlainWatch
.
create
(
3
,
"ADDED"
,
"1"
,
"10.0.0.1"
,
"ADDED"
,
"2"
,
"10.0.0.2"
,
"MODIFIED"
,
"1"
,
"10.0.0.3"
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
watch
,
()
->
"1"
);
coordinator
.
start
();
coordinator
.
registerRemote
(
new
RemoteInstance
(
new
Address
(
"0.0.0.0"
,
8454
,
true
)));
watch
.
await
();
assertThat
(
coordinator
.
queryRemoteNodes
().
size
(),
is
(
2
));
assertThat
(
coordinator
.
queryRemoteNodes
()
.
stream
()
.
filter
(
instance
->
instance
.
getAddress
().
isSelf
())
.
findFirst
()
.
get
()
.
getAddress
()
.
getHost
(),
is
(
"10.0.0.3"
));
}
private
Address
selfAddress
;
private
NamespacedPodListInformer
informer
;
@Test
public
void
assertDeleted
()
throws
InterruptedException
{
PlainWatch
watch
=
PlainWatch
.
create
(
3
,
"ADDED"
,
"1"
,
"10.0.0.1"
,
"ADDED"
,
"2"
,
"10.0.0.2"
,
"DELETED"
,
"2"
,
"10.0.0.2"
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
watch
,
()
->
"1"
);
coordinator
.
start
();
coordinator
.
registerRemote
(
new
RemoteInstance
(
new
Address
(
"0.0.0.0"
,
8454
,
true
)));
watch
.
await
();
assertThat
(
coordinator
.
queryRemoteNodes
().
size
(),
is
(
1
));
assertThat
(
coordinator
.
queryRemoteNodes
()
.
stream
()
.
filter
(
instance
->
instance
.
getAddress
().
isSelf
())
.
findFirst
()
.
get
()
.
getAddress
()
.
getHost
(),
is
(
"10.0.0.1"
));
}
@Before
public
void
prepare
()
throws
IllegalAccessException
{
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
new
ClusterModuleKubernetesConfig
());
MemberModifier
.
field
(
KubernetesCoordinator
.
class
,
"uid"
).
set
(
coordinator
,
String
.
valueOf
(
SELF_UID
));
selfAddress
=
new
Address
(
LOCAL_HOST
,
GRPC_PORT
,
true
);
informer
=
PowerMockito
.
mock
(
NamespacedPodListInformer
.
class
);
Whitebox
.
setInternalState
(
NamespacedPodListInformer
.
class
,
"INFORMER"
,
informer
);
@Test
public
void
assertError
()
throws
InterruptedException
{
PlainWatch
watch
=
PlainWatch
.
create
(
3
,
"ADDED"
,
"1"
,
"10.0.0.1"
,
"ERROR"
,
"X"
,
"10.0.0.2"
,
"ADDED"
,
"2"
,
"10.0.0.2"
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
watch
,
()
->
"1"
);
coordinator
.
start
();
coordinator
.
registerRemote
(
new
RemoteInstance
(
new
Address
(
"0.0.0.0"
,
8454
,
true
)));
watch
.
await
();
assertThat
(
coordinator
.
queryRemoteNodes
().
size
(),
is
(
2
));
assertThat
(
coordinator
.
queryRemoteNodes
()
.
stream
()
.
filter
(
instance
->
instance
.
getAddress
().
isSelf
())
.
findFirst
()
.
get
()
.
getAddress
()
.
getHost
(),
is
(
"10.0.0.1"
));
}
@Test
public
void
assertModifiedInReceiverRole
()
throws
InterruptedException
{
PlainWatch
watch
=
PlainWatch
.
create
(
3
,
"ADDED"
,
"1"
,
"10.0.0.1"
,
"ADDED"
,
"2"
,
"10.0.0.2"
,
"MODIFIED"
,
"1"
,
"10.0.0.3"
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
watch
,
()
->
"1"
);
coordinator
.
start
();
watch
.
await
();
assertThat
(
coordinator
.
queryRemoteNodes
().
size
(),
is
(
2
));
assertThat
(
coordinator
.
queryRemoteNodes
()
.
stream
()
.
filter
(
instance
->
instance
.
getAddress
().
isSelf
())
.
findFirst
()
.
get
()
.
getAddress
()
.
getHost
(),
is
(
"10.0.0.3"
));
}
public
void
queryRemoteNodesWhenInformerNotwork
()
throws
Exception
{
PowerMockito
.
doReturn
(
Optional
.
empty
()).
when
(
NamespacedPodListInformer
.
INFORMER
).
listPods
();
List
<
RemoteInstance
>
remoteInstances
=
Whitebox
.
invokeMethod
(
coordinator
,
"queryRemoteNodes"
);
Assert
.
assertEquals
(
1
,
remoteInstances
.
size
());
Assert
.
assertEquals
(
selfAddress
,
remoteInstances
.
get
(
0
).
getAddress
());
@Test
public
void
assertDeletedInReceiverRole
()
throws
InterruptedException
{
PlainWatch
watch
=
PlainWatch
.
create
(
3
,
"ADDED"
,
"1"
,
"10.0.0.1"
,
"ADDED"
,
"2"
,
"10.0.0.2"
,
"DELETED"
,
"2"
,
"10.0.0.2"
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
watch
,
()
->
"1"
);
coordinator
.
start
();
watch
.
await
();
assertThat
(
coordinator
.
queryRemoteNodes
().
size
(),
is
(
1
));
assertThat
(
coordinator
.
queryRemoteNodes
()
.
stream
()
.
filter
(
instance
->
instance
.
getAddress
().
isSelf
())
.
findFirst
()
.
get
()
.
getAddress
()
.
getHost
(),
is
(
"10.0.0.1"
));
}
@Test
public
void
assertErrorInReceiverRole
()
throws
InterruptedException
{
PlainWatch
watch
=
PlainWatch
.
create
(
3
,
"ADDED"
,
"1"
,
"10.0.0.1"
,
"ERROR"
,
"X"
,
"10.0.0.2"
,
"ADDED"
,
"2"
,
"10.0.0.2"
);
coordinator
=
new
KubernetesCoordinator
(
getManager
(),
watch
,
()
->
"1"
);
coordinator
.
start
();
watch
.
await
();
assertThat
(
coordinator
.
queryRemoteNodes
().
size
(),
is
(
2
));
assertThat
(
coordinator
.
queryRemoteNodes
()
.
stream
()
.
filter
(
instance
->
instance
.
getAddress
().
isSelf
())
.
findFirst
()
.
get
()
.
getAddress
()
.
getHost
(),
is
(
"10.0.0.1"
));
public
void
queryRemoteNodesWhenInformerWork
()
throws
Exception
{
PowerMockito
.
doReturn
(
Optional
.
of
(
mockPodList
())).
when
(
NamespacedPodListInformer
.
INFORMER
).
listPods
();
List
<
RemoteInstance
>
remoteInstances
=
Whitebox
.
invokeMethod
(
coordinator
,
"queryRemoteNodes"
);
Assert
.
assertEquals
(
5
,
remoteInstances
.
size
());
List
<
RemoteInstance
>
self
=
remoteInstances
.
stream
()
.
filter
(
item
->
item
.
getAddress
().
isSelf
())
.
collect
(
Collectors
.
toList
());
List
<
RemoteInstance
>
others
=
remoteInstances
.
stream
()
.
filter
(
item
->
!
item
.
getAddress
().
isSelf
())
.
collect
(
Collectors
.
toList
());
Assert
.
assertEquals
(
1
,
self
.
size
());
Assert
.
assertEquals
(
4
,
others
.
size
());
}
p
ublic
ModuleDefineHolder
getManager
()
{
p
rivate
ModuleManagerTesting
getManager
()
{
ModuleManagerTesting
moduleManagerTesting
=
new
ModuleManagerTesting
();
ModuleDefineTesting
coreModuleDefine
=
new
ModuleDefineTesting
();
moduleManagerTesting
.
put
(
CoreModule
.
NAME
,
coreModuleDefine
);
CoreModuleConfig
config
=
Mockito
.
mock
(
CoreModuleConfig
.
class
);
when
(
config
.
getGRPCHost
()).
thenReturn
(
"127.0.0.1"
);
when
(
config
.
getGRPCPort
()).
thenReturn
(
8454
);
CoreModuleConfig
config
=
Power
Mockito
.
mock
(
CoreModuleConfig
.
class
);
when
(
config
.
getGRPCHost
()).
thenReturn
(
LOCAL_HOST
);
when
(
config
.
getGRPCPort
()).
thenReturn
(
GRPC_PORT
);
coreModuleDefine
.
provider
().
registerServiceImplementation
(
ConfigService
.
class
,
new
ConfigService
(
config
));
return
moduleManagerTesting
;
}
}
\ No newline at end of file
private
List
<
V1Pod
>
mockPodList
()
{
List
<
V1Pod
>
pods
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
V1Pod
v1Pod
=
new
V1Pod
();
v1Pod
.
setMetadata
(
new
V1ObjectMeta
());
v1Pod
.
setStatus
(
new
V1PodStatus
());
v1Pod
.
getMetadata
().
setUid
(
String
.
valueOf
(
SELF_UID
+
i
));
v1Pod
.
getStatus
().
setPodIP
(
LOCAL_HOST
);
pods
.
add
(
v1Pod
);
}
return
pods
;
}
}
oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/fixture/PlainWatch.java
已删除
100644 → 0
浏览文件 @
08897ce5
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture
;
import
java.util.ArrayList
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.NoSuchElementException
;
import
java.util.concurrent.CountDownLatch
;
import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.Event
;
import
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.ReusableWatch
;
public
class
PlainWatch
implements
ReusableWatch
<
Event
>
{
public
static
PlainWatch
create
(
final
int
size
,
final
String
...
args
)
{
List
<
Event
>
events
=
new
ArrayList
<>(
args
.
length
/
3
);
for
(
int
i
=
0
;
i
<
args
.
length
;
i
++)
{
events
.
add
(
new
Event
(
args
[
i
++],
args
[
i
++],
args
[
i
]));
}
return
new
PlainWatch
(
events
,
size
);
}
private
final
List
<
Event
>
events
;
private
final
int
size
;
private
final
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
private
Iterator
<
Event
>
iterator
;
private
int
count
;
private
PlainWatch
(
final
List
<
Event
>
events
,
final
int
size
)
{
this
.
events
=
events
;
this
.
size
=
size
;
}
@Override
public
void
initOrReset
()
{
final
Iterator
<
Event
>
internal
=
events
.
subList
(
count
,
events
.
size
()).
iterator
();
iterator
=
new
Iterator
<
Event
>()
{
public
boolean
hasNext
()
{
boolean
result
=
count
<
size
&&
internal
.
hasNext
();
if
(!
result
)
{
latch
.
countDown
();
}
return
result
;
}
public
Event
next
()
{
if
(!
this
.
hasNext
())
{
throw
new
NoSuchElementException
();
}
else
{
++
count
;
return
internal
.
next
();
}
}
public
void
remove
()
{
internal
.
remove
();
}
};
}
@Override
public
Iterator
<
Event
>
iterator
()
{
return
iterator
;
}
public
void
await
()
throws
InterruptedException
{
latch
.
await
();
}
}
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResource.java
浏览文件 @
ad47c904
...
...
@@ -19,14 +19,13 @@
package
org.apache.skywalking.oap.server.receiver.envoy.als
;
import
io.kubernetes.client.models.V1ObjectMeta
;
import
io.kubernetes.client.models.V1OwnerReference
;
import
io.kubernetes.client.openapi.models.V1ObjectMeta
;
import
io.kubernetes.client.openapi.models.V1OwnerReference
;
import
java.util.Optional
;
import
lombok.AccessLevel
;
import
lombok.Getter
;
import
lombok.RequiredArgsConstructor
;
import
java.util.Optional
;
@RequiredArgsConstructor
class
DependencyResource
{
@Getter
(
AccessLevel
.
PACKAGE
)
...
...
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/Fetcher.java
浏览文件 @
ad47c904
...
...
@@ -19,14 +19,13 @@
package
org.apache.skywalking.oap.server.receiver.envoy.als
;
import
io.kubernetes.client.ApiException
;
import
io.kubernetes.client.models.V1ObjectMeta
;
import
io.kubernetes.client.models.V1OwnerReference
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
io.kubernetes.client.openapi.ApiException
;
import
io.kubernetes.client.openapi.models.V1ObjectMeta
;
import
io.kubernetes.client.openapi.models.V1OwnerReference
;
import
java.util.Optional
;
import
java.util.function.Function
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
interface
Fetcher
extends
Function
<
V1OwnerReference
,
Optional
<
V1ObjectMeta
>>
{
...
...
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
浏览文件 @
ad47c904
此差异已折叠。
点击以展开。
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/DependencyResourceTest.java
浏览文件 @
ad47c904
...
...
@@ -19,16 +19,15 @@
package
org.apache.skywalking.oap.server.receiver.envoy.als
;
import
io.kubernetes.client.ApiException
;
import
io.kubernetes.client.models.V1ObjectMeta
;
import
io.kubernetes.client.models.V1OwnerReference
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.junit.runners.Parameterized
;
import
io.kubernetes.client.openapi.ApiException
;
import
io.kubernetes.client.openapi.models.V1ObjectMeta
;
import
io.kubernetes.client.openapi.models.V1OwnerReference
;
import
java.util.Arrays
;
import
java.util.Collection
;
import
java.util.Collections
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.junit.runners.Parameterized
;
import
static
org
.
hamcrest
.
core
.
Is
.
is
;
import
static
org
.
junit
.
Assert
.
assertThat
;
...
...
tools/dependencies/known-oap-backend-dependencies-es7.txt
浏览文件 @
ad47c904
...
...
@@ -5,24 +5,26 @@ antlr4-runtime-4.7.1.jar
aopalliance-1.0.jar
apollo-client-1.4.0.jar
apollo-core-1.4.0.jar
bcpkix-jdk15on-1.
59
.jar
bcprov-ext-jdk15on-1.
59
.jar
bcprov-jdk15on-1.
59
.jar
builder-annotations-0.
9.2
.jar
bcpkix-jdk15on-1.
61
.jar
bcprov-ext-jdk15on-1.
61
.jar
bcprov-jdk15on-1.
61
.jar
builder-annotations-0.
21.0
.jar
checker-qual-2.8.1.jar
client-java-
4
.0.0.jar
client-java-api-
4
.0.0.jar
client-java-proto-
4
.0.0.jar
client-java-
8
.0.0.jar
client-java-api-
8
.0.0.jar
client-java-proto-
8
.0.0.jar
commons-codec-1.11.jar
commons-compress-1.18.jar
commons-collections4-4.1.jar
commons-compress-1.19.jar
commons-dbcp-1.4.jar
commons-io-2.6.jar
commons-lang3-3.7.jar
commons-pool-1.5.4.jar
commons-text-1.4.jar
compiler-0.9.3.jar
consul-client-1.2.6.jar
converter-jackson-2.3.0.jar
co
mpiler-0.9.3
.jar
co
nverter-moshi-2.5.0
.jar
curator-client-4.0.1.jar
curator-framework-4.0.1.jar
curator-recipes-4.0.1.jar
...
...
@@ -49,6 +51,7 @@ grpc-protobuf-1.26.0.jar
grpc-protobuf-lite-1.26.0.jar
grpc-stub-1.26.0.jar
gson-2.8.6.jar
gson-fire-1.8.3.jar
guava-28.1-jre.jar
guice-4.1.0.jar
h2-1.4.196.jar
...
...
@@ -59,6 +62,7 @@ httpasyncclient-4.1.4.jar
httpclient-4.5.7.jar
httpcore-4.4.11.jar
httpcore-nio-4.4.11.jar
influxdb-java-2.15.jar
jackson-annotations-2.9.5.jar
jackson-core-2.9.5.jar
jackson-core-asl-1.9.13.jar
...
...
@@ -87,6 +91,7 @@ jna-4.5.1.jar
joda-convert-1.2.jar
joda-time-2.10.5.jar
jopt-simple-4.6.jar
jose4j-0.7.0.jar
json-flattener-0.6.0.jar
jsr305-3.0.2.jar
kotlin-reflect-1.1.1.jar
...
...
@@ -98,7 +103,7 @@ log4j-api-2.9.0.jar
log4j-core-2.9.0.jar
log4j-over-slf4j-1.7.25.jar
log4j-slf4j-impl-2.9.0.jar
logging-interceptor-
2.7.5
.jar
logging-interceptor-
3.13.1
.jar
lucene-analyzers-common-8.0.0.jar
lucene-backward-codecs-8.0.0.jar
lucene-core-8.0.0.jar
...
...
@@ -115,6 +120,8 @@ lucene-spatial-extras-8.0.0.jar
lucene-spatial3d-8.0.0.jar
lucene-suggest-8.0.0.jar
minimal-json-0.9.5.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
netty-3.10.5.Final.jar
netty-buffer-4.1.42.Final.jar
netty-codec-4.1.42.Final.jar
...
...
@@ -129,9 +136,7 @@ netty-resolver-4.1.42.Final.jar
netty-resolver-dns-4.1.42.Final.jar
netty-tcnative-boringssl-static-2.0.26.Final.jar
netty-transport-4.1.42.Final.jar
okhttp-2.7.5.jar
okhttp-3.9.0.jar
okhttp-ws-2.7.5.jar
okio-1.13.0.jar
opencensus-api-0.24.0.jar
opencensus-contrib-grpc-metrics-0.24.0.jar
...
...
@@ -143,7 +148,7 @@ protobuf-java-util-3.11.4.jar
rank-eval-client-7.0.0.jar
reactive-streams-1.0.2.jar
reflectasm-1.11.7.jar
resourcecify-annotations-0.
9.2
.jar
resourcecify-annotations-0.
21.0
.jar
retrofit-2.3.0.jar
simpleclient-0.6.0.jar
simpleclient_common-0.6.0.jar
...
...
@@ -151,15 +156,10 @@ simpleclient_hotspot-0.6.0.jar
simpleclient_httpserver-0.6.0.jar
slf4j-api-1.7.25.jar
snakeyaml-1.18.jar
sundr-codegen-0.
9.2
.jar
sundr-core-0.
9.2
.jar
swagger-annotations-1.5.
1
2.jar
sundr-codegen-0.
21.0
.jar
sundr-core-0.
21.0
.jar
swagger-annotations-1.5.
2
2.jar
t-digest-3.2.jar
zookeeper-3.4.10.jar
converter-moshi-2.5.0.jar
influxdb-java-2.15.jar
logging-interceptor-3.13.1.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
vavr-0.10.3.jar
vavr-match-0.10.3.jar
zookeeper-3.4.10.jar
tools/dependencies/known-oap-backend-dependencies.txt
浏览文件 @
ad47c904
HdrHistogram-2.1.9.jar
HikariCP-3.1.0.jar
aggs-matrix-stats-client-6.3.2.jar
animal-sniffer-annotations-1.18.jar
annotations-13.0.jar
...
...
@@ -5,17 +7,18 @@ antlr4-runtime-4.7.1.jar
aopalliance-1.0.jar
apollo-client-1.4.0.jar
apollo-core-1.4.0.jar
bcpkix-jdk15on-1.
59
.jar
bcprov-ext-jdk15on-1.
59
.jar
bcprov-jdk15on-1.
59
.jar
builder-annotations-0.
9.2
.jar
bcpkix-jdk15on-1.
61
.jar
bcprov-ext-jdk15on-1.
61
.jar
bcprov-jdk15on-1.
61
.jar
builder-annotations-0.
21.0
.jar
caffeine-2.6.2.jar
checker-qual-2.8.1.jar
client-java-
4
.0.0.jar
client-java-api-
4
.0.0.jar
client-java-proto-
4
.0.0.jar
client-java-
8
.0.0.jar
client-java-api-
8
.0.0.jar
client-java-proto-
8
.0.0.jar
commons-codec-1.11.jar
commons-compress-1.18.jar
commons-collections4-4.1.jar
commons-compress-1.19.jar
commons-dbcp-1.4.jar
commons-io-2.6.jar
commons-lang3-3.7.jar
...
...
@@ -23,6 +26,7 @@ commons-pool-1.5.4.jar
commons-text-1.4.jar
consul-client-1.2.6.jar
converter-jackson-2.3.0.jar
converter-moshi-2.5.0.jar
curator-client-4.0.1.jar
curator-framework-4.0.1.jar
curator-recipes-4.0.1.jar
...
...
@@ -48,16 +52,16 @@ grpc-protobuf-1.26.0.jar
grpc-protobuf-lite-1.26.0.jar
grpc-stub-1.26.0.jar
gson-2.8.6.jar
gson-fire-1.8.3.jar
guava-28.1-jre.jar
guice-4.1.0.jar
h2-1.4.196.jar
HdrHistogram-2.1.9.jar
HikariCP-3.1.0.jar
hppc-0.7.1.jar
httpasyncclient-4.1.2.jar
httpclient-4.5.2.jar
httpcore-4.4.5.jar
httpcore-nio-4.4.5.jar
influxdb-java-2.15.jar
jackson-annotations-2.9.5.jar
jackson-core-2.9.5.jar
jackson-core-asl-1.9.13.jar
...
...
@@ -86,6 +90,7 @@ jna-4.5.1.jar
joda-convert-1.2.jar
joda-time-2.10.5.jar
jopt-simple-4.6.jar
jose4j-0.7.0.jar
json-flattener-0.6.0.jar
jsr305-3.0.2.jar
kotlin-reflect-1.1.1.jar
...
...
@@ -96,7 +101,7 @@ log4j-api-2.9.0.jar
log4j-core-2.9.0.jar
log4j-over-slf4j-1.7.25.jar
log4j-slf4j-impl-2.9.0.jar
logging-interceptor-
2.7.5
.jar
logging-interceptor-
3.13.1
.jar
lucene-analyzers-common-7.3.1.jar
lucene-backward-codecs-7.3.1.jar
lucene-core-7.3.1.jar
...
...
@@ -113,6 +118,8 @@ lucene-spatial-extras-7.3.1.jar
lucene-spatial3d-7.3.1.jar
lucene-suggest-7.3.1.jar
minimal-json-0.9.5.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
netty-3.10.5.Final.jar
netty-buffer-4.1.42.Final.jar
netty-codec-4.1.42.Final.jar
...
...
@@ -127,9 +134,7 @@ netty-resolver-4.1.42.Final.jar
netty-resolver-dns-4.1.42.Final.jar
netty-tcnative-boringssl-static-2.0.26.Final.jar
netty-transport-4.1.42.Final.jar
okhttp-2.7.5.jar
okhttp-3.9.0.jar
okhttp-ws-2.7.5.jar
okio-1.13.0.jar
opencensus-api-0.24.0.jar
opencensus-contrib-grpc-metrics-0.24.0.jar
...
...
@@ -141,7 +146,7 @@ protobuf-java-util-3.11.4.jar
rank-eval-client-6.3.2.jar
reactive-streams-1.0.2.jar
reflectasm-1.11.7.jar
resourcecify-annotations-0.
9.2
.jar
resourcecify-annotations-0.
21.0
.jar
retrofit-2.3.0.jar
simpleclient-0.6.0.jar
simpleclient_common-0.6.0.jar
...
...
@@ -149,16 +154,11 @@ simpleclient_hotspot-0.6.0.jar
simpleclient_httpserver-0.6.0.jar
slf4j-api-1.7.25.jar
snakeyaml-1.18.jar
sundr-codegen-0.
9.2
.jar
sundr-core-0.
9.2
.jar
swagger-annotations-1.5.
1
2.jar
sundr-codegen-0.
21.0
.jar
sundr-core-0.
21.0
.jar
swagger-annotations-1.5.
2
2.jar
t-digest-3.2.jar
zipkin-2.9.1.jar
zookeeper-3.4.10.jar
converter-moshi-2.5.0.jar
influxdb-java-2.15.jar
logging-interceptor-3.13.1.jar
moshi-1.5.0.jar
msgpack-core-0.8.16.jar
vavr-0.10.3.jar
vavr-match-0.10.3.jar
zipkin-2.9.1.jar
zookeeper-3.4.10.jar
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录