Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
3fdfdf3b
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
3fdfdf3b
编写于
7月 19, 2020
作者:
G
Gao Hongtao
提交者:
GitHub
7月 19, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Storage elasticsearch health check (#5099)
上级
977062ba
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
368 addition
and
82 deletion
+368
-82
apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
.../main/java/org/apache/skywalking/apm/util/StringUtil.java
+8
-0
oap-server/server-library/library-client/pom.xml
oap-server/server-library/library-client/pom.xml
+6
-0
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
...ver/library/client/elasticsearch/ElasticSearchClient.java
+99
-26
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/DelegatedHealthChecker.java
...er/library/client/healthcheck/DelegatedHealthChecker.java
+39
-0
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthCheckable.java
...ap/server/library/client/healthcheck/HealthCheckable.java
+34
-0
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
...rver/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
+17
-17
oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/HealthChecker.java
...che/skywalking/oap/server/library/util/HealthChecker.java
+37
-0
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
...gin/elasticsearch/StorageModuleElasticsearchProvider.java
+7
-0
oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
...n/elasticsearch7/StorageModuleElasticsearch7Provider.java
+7
-0
oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
...ge/plugin/elasticsearch7/client/ElasticSearch7Client.java
+63
-26
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
.../oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+3
-10
oap-server/server-telemetry/telemetry-api/src/main/java/org/apache/skywalking/oap/server/telemetry/api/HealthCheckMetrics.java
...ywalking/oap/server/telemetry/api/HealthCheckMetrics.java
+45
-0
oap-server/server-telemetry/telemetry-api/src/main/java/org/apache/skywalking/oap/server/telemetry/api/MetricsCreator.java
...e/skywalking/oap/server/telemetry/api/MetricsCreator.java
+3
-3
未找到文件。
apm-commons/apm-util/src/main/java/org/apache/skywalking/apm/util/StringUtil.java
浏览文件 @
3fdfdf3b
...
...
@@ -18,6 +18,8 @@
package
org.apache.skywalking.apm.util
;
import
java.util.function.Consumer
;
public
final
class
StringUtil
{
public
static
boolean
isEmpty
(
String
str
)
{
return
str
==
null
||
str
.
length
()
==
0
;
...
...
@@ -27,6 +29,12 @@ public final class StringUtil {
return
!
isEmpty
(
str
);
}
public
static
void
setIfPresent
(
String
value
,
Consumer
<
String
>
setter
)
{
if
(
isNotEmpty
(
value
))
{
setter
.
accept
(
value
);
}
}
public
static
String
join
(
final
char
delimiter
,
final
String
...
strings
)
{
if
(
strings
.
length
==
0
)
{
return
null
;
...
...
oap-server/server-library/library-client/pom.xml
浏览文件 @
3fdfdf3b
...
...
@@ -29,6 +29,12 @@
<packaging>
jar
</packaging>
<dependencies>
<dependency>
<groupId>
org.apache.skywalking
</groupId>
<artifactId>
library-util
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
io.grpc
</groupId>
<artifactId>
grpc-core
</artifactId>
...
...
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
浏览文件 @
3fdfdf3b
...
...
@@ -37,7 +37,9 @@ import java.util.HashMap;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.locks.ReentrantLock
;
import
javax.net.ssl.SSLContext
;
import
lombok.RequiredArgsConstructor
;
import
lombok.Setter
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.http.HttpEntity
;
...
...
@@ -58,8 +60,11 @@ import org.apache.http.ssl.SSLContextBuilder;
import
org.apache.http.ssl.SSLContexts
;
import
org.apache.skywalking.apm.util.StringUtil
;
import
org.apache.skywalking.oap.server.library.client.Client
;
import
org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker
;
import
org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable
;
import
org.apache.skywalking.oap.server.library.client.request.InsertRequest
;
import
org.apache.skywalking.oap.server.library.client.request.UpdateRequest
;
import
org.apache.skywalking.oap.server.library.util.HealthChecker
;
import
org.elasticsearch.action.admin.indices.create.CreateIndexRequest
;
import
org.elasticsearch.action.admin.indices.create.CreateIndexResponse
;
import
org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
;
...
...
@@ -90,7 +95,8 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
* ElasticSearchClient connects to the ES server by using ES client APIs.
*/
@Slf4j
public
class
ElasticSearchClient
implements
Client
{
@RequiredArgsConstructor
public
class
ElasticSearchClient
implements
Client
,
HealthCheckable
{
public
static
final
String
TYPE
=
"type"
;
protected
final
String
clusterNodes
;
protected
final
String
protocol
;
...
...
@@ -103,6 +109,8 @@ public class ElasticSearchClient implements Client {
private
volatile
String
password
;
private
final
List
<
IndexNameConverter
>
indexNameConverters
;
protected
volatile
RestHighLevelClient
client
;
protected
DelegatedHealthChecker
healthChecker
=
new
DelegatedHealthChecker
();
protected
final
ReentrantLock
connectLock
=
new
ReentrantLock
();
public
ElasticSearchClient
(
String
clusterNodes
,
String
protocol
,
...
...
@@ -122,16 +130,21 @@ public class ElasticSearchClient implements Client {
@Override
public
void
connect
()
throws
IOException
,
KeyStoreException
,
NoSuchAlgorithmException
,
KeyManagementException
,
CertificateException
{
List
<
HttpHost
>
hosts
=
parseClusterNodes
(
protocol
,
clusterNodes
);
if
(
client
!=
null
)
{
try
{
client
.
close
();
}
catch
(
Throwable
t
)
{
log
.
error
(
"ElasticSearch client reconnection fails based on new config"
,
t
);
connectLock
.
lock
();
try
{
List
<
HttpHost
>
hosts
=
parseClusterNodes
(
protocol
,
clusterNodes
);
if
(
client
!=
null
)
{
try
{
client
.
close
();
}
catch
(
Throwable
t
)
{
log
.
error
(
"ElasticSearch client reconnection fails based on new config"
,
t
);
}
}
client
=
createClient
(
hosts
);
client
.
ping
();
}
finally
{
connectLock
.
unlock
();
}
client
=
createClient
(
hosts
);
client
.
ping
();
}
protected
RestHighLevelClient
createClient
(
...
...
@@ -208,10 +221,23 @@ public class ElasticSearchClient implements Client {
public
List
<
String
>
retrievalIndexByAliases
(
String
aliases
)
throws
IOException
{
aliases
=
formatIndexName
(
aliases
);
Response
response
=
client
.
getLowLevelClient
().
performRequest
(
HttpGet
.
METHOD_NAME
,
"/_alias/"
+
aliases
);
Response
response
;
try
{
response
=
client
.
getLowLevelClient
().
performRequest
(
HttpGet
.
METHOD_NAME
,
"/_alias/"
+
aliases
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
if
(
HttpStatus
.
SC_OK
==
response
.
getStatusLine
().
getStatusCode
())
{
Gson
gson
=
new
Gson
();
InputStreamReader
reader
=
new
InputStreamReader
(
response
.
getEntity
().
getContent
());
InputStreamReader
reader
;
try
{
reader
=
new
InputStreamReader
(
response
.
getEntity
().
getContent
());
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
JsonObject
responseJson
=
gson
.
fromJson
(
reader
,
JsonObject
.
class
);
log
.
debug
(
"retrieval indexes by aliases {}, response is {}"
,
aliases
,
responseJson
);
return
new
ArrayList
<>(
responseJson
.
keySet
());
...
...
@@ -310,13 +336,44 @@ public class ElasticSearchClient implements Client {
SearchRequest
searchRequest
=
new
SearchRequest
(
indexName
);
searchRequest
.
types
(
TYPE
);
searchRequest
.
source
(
searchSourceBuilder
);
return
client
.
search
(
searchRequest
);
try
{
SearchResponse
response
=
client
.
search
(
searchRequest
);
healthChecker
.
health
();
return
response
;
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
handleIOPoolStopped
(
t
);
throw
t
;
}
}
protected
void
handleIOPoolStopped
(
Throwable
t
)
throws
IOException
{
if
(!(
t
instanceof
IllegalStateException
))
{
return
;
}
IllegalStateException
ise
=
(
IllegalStateException
)
t
;
// Fixed the issue described in https://github.com/elastic/elasticsearch/issues/39946
if
(
ise
.
getMessage
().
contains
(
"I/O reactor status: STOPPED"
)
&&
connectLock
.
tryLock
())
{
try
{
connect
();
}
catch
(
KeyStoreException
|
NoSuchAlgorithmException
|
KeyManagementException
|
CertificateException
e
)
{
throw
new
IllegalStateException
(
"Can't reconnect to Elasticsearch"
,
e
);
}
}
}
public
GetResponse
get
(
String
indexName
,
String
id
)
throws
IOException
{
indexName
=
formatIndexName
(
indexName
);
GetRequest
request
=
new
GetRequest
(
indexName
,
TYPE
,
id
);
return
client
.
get
(
request
);
try
{
GetResponse
response
=
client
.
get
(
request
);
healthChecker
.
health
();
return
response
;
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
SearchResponse
ids
(
String
indexName
,
String
[]
ids
)
throws
IOException
{
...
...
@@ -325,28 +382,39 @@ public class ElasticSearchClient implements Client {
SearchRequest
searchRequest
=
new
SearchRequest
(
indexName
);
searchRequest
.
types
(
TYPE
);
searchRequest
.
source
().
query
(
QueryBuilders
.
idsQuery
().
addIds
(
ids
)).
size
(
ids
.
length
);
return
client
.
search
(
searchRequest
);
try
{
SearchResponse
response
=
client
.
search
(
searchRequest
);
healthChecker
.
health
();
return
response
;
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
void
forceInsert
(
String
indexName
,
String
id
,
XContentBuilder
source
)
throws
IOException
{
IndexRequest
request
=
(
IndexRequest
)
prepareInsert
(
indexName
,
id
,
source
);
request
.
setRefreshPolicy
(
WriteRequest
.
RefreshPolicy
.
IMMEDIATE
);
client
.
index
(
request
);
}
public
void
forceUpdate
(
String
indexName
,
String
id
,
XContentBuilder
source
,
long
version
)
throws
IOException
{
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
request
=
(
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
)
prepareUpdate
(
indexName
,
id
,
source
);
request
.
version
(
version
);
request
.
setRefreshPolicy
(
WriteRequest
.
RefreshPolicy
.
IMMEDIATE
);
client
.
update
(
request
);
try
{
client
.
index
(
request
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
void
forceUpdate
(
String
indexName
,
String
id
,
XContentBuilder
source
)
throws
IOException
{
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
request
=
(
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
)
prepareUpdate
(
indexName
,
id
,
source
);
request
.
setRefreshPolicy
(
WriteRequest
.
RefreshPolicy
.
IMMEDIATE
);
client
.
update
(
request
);
try
{
client
.
update
(
request
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
InsertRequest
prepareInsert
(
String
indexName
,
String
id
,
XContentBuilder
source
)
{
...
...
@@ -379,8 +447,9 @@ public class ElasticSearchClient implements Client {
int
size
=
request
.
requests
().
size
();
BulkResponse
responses
=
client
.
bulk
(
request
);
log
.
info
(
"Synchronous bulk took time: {} millis, size: {}"
,
responses
.
getTook
().
getMillis
(),
size
);
}
catch
(
IOException
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
}
}
...
...
@@ -431,4 +500,8 @@ public class ElasticSearchClient implements Client {
}
return
indexName
;
}
@Override
public
void
registerChecker
(
HealthChecker
healthChecker
)
{
this
.
healthChecker
.
register
(
healthChecker
);
}
}
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/DelegatedHealthChecker.java
0 → 100644
浏览文件 @
3fdfdf3b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.library.client.healthcheck
;
import
java.util.Optional
;
import
java.util.concurrent.atomic.AtomicReference
;
import
org.apache.skywalking.oap.server.library.util.HealthChecker
;
public
class
DelegatedHealthChecker
implements
HealthChecker
{
private
final
AtomicReference
<
HealthChecker
>
delegated
=
new
AtomicReference
<>();
@Override
public
void
health
()
{
Optional
.
ofNullable
(
delegated
.
get
()).
ifPresent
(
HealthChecker:
:
health
);
}
@Override
public
void
unHealth
(
Throwable
t
)
{
Optional
.
ofNullable
(
delegated
.
get
()).
ifPresent
(
d
->
d
.
unHealth
(
t
));
}
public
void
register
(
HealthChecker
healthChecker
)
{
delegated
.
set
(
healthChecker
);
}
}
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthCheckable.java
0 → 100644
浏览文件 @
3fdfdf3b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.library.client.healthcheck
;
import
org.apache.skywalking.oap.server.library.util.HealthChecker
;
/**
* HealthCheckable indicate the client has the capacity of health check and need to register healthChecker.
*/
public
interface
HealthCheckable
{
/**
* Register health checker.
*
* @param healthChecker HealthChecker to be registered.
*/
void
registerChecker
(
HealthChecker
healthChecker
);
}
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
浏览文件 @
3fdfdf3b
...
...
@@ -26,37 +26,27 @@ import java.sql.ResultSet;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.util.Properties
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
import
java.util.function.Consumer
;
import
org.apache.skywalking.oap.server.library.client.Client
;
import
org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker
;
import
org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable
;
import
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException
;
import
org.apache.skywalking.oap.server.library.util.HealthChecker
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* JDBC Client uses HikariCP connection management lib to execute SQL.
*/
public
class
JDBCHikariCPClient
implements
Client
{
public
class
JDBCHikariCPClient
implements
Client
,
HealthCheckable
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
JDBCHikariCPClient
.
class
);
private
final
HikariConfig
hikariConfig
;
private
final
DelegatedHealthChecker
healthChecker
;
private
HikariDataSource
dataSource
;
private
HikariConfig
hikariConfig
;
public
JDBCHikariCPClient
(
Properties
properties
)
{
hikariConfig
=
new
HikariConfig
(
properties
);
}
public
void
setHealthCheckListener
(
Consumer
<
Boolean
>
healthListener
)
{
ScheduledExecutorService
asyncHealthScheduler
=
Executors
.
newSingleThreadScheduledExecutor
();
asyncHealthScheduler
.
scheduleAtFixedRate
(()
->
{
try
(
Connection
c
=
dataSource
.
getConnection
())
{
healthListener
.
accept
(
true
);
}
catch
(
SQLException
ignored
)
{
healthListener
.
accept
(
false
);
}
},
0
,
3
,
TimeUnit
.
SECONDS
);
this
.
healthChecker
=
new
DelegatedHealthChecker
();
}
@Override
...
...
@@ -93,7 +83,9 @@ public class JDBCHikariCPClient implements Client {
logger
.
debug
(
"execute aql: {}"
,
sql
);
try
(
Statement
statement
=
connection
.
createStatement
())
{
statement
.
execute
(
sql
);
healthChecker
.
health
();
}
catch
(
SQLException
e
)
{
healthChecker
.
unHealth
(
e
);
throw
new
JDBCClientException
(
e
.
getMessage
(),
e
);
}
}
...
...
@@ -107,6 +99,7 @@ public class JDBCHikariCPClient implements Client {
setStatementParam
(
statement
,
params
);
result
=
statement
.
execute
();
statement
.
closeOnCompletion
();
healthChecker
.
health
();
}
catch
(
SQLException
e
)
{
if
(
statement
!=
null
)
{
try
{
...
...
@@ -114,6 +107,7 @@ public class JDBCHikariCPClient implements Client {
}
catch
(
SQLException
e1
)
{
}
}
healthChecker
.
unHealth
(
e
);
throw
new
JDBCClientException
(
e
.
getMessage
(),
e
);
}
...
...
@@ -129,6 +123,7 @@ public class JDBCHikariCPClient implements Client {
setStatementParam
(
statement
,
params
);
rs
=
statement
.
executeQuery
();
statement
.
closeOnCompletion
();
healthChecker
.
health
();
}
catch
(
SQLException
e
)
{
if
(
statement
!=
null
)
{
try
{
...
...
@@ -136,6 +131,7 @@ public class JDBCHikariCPClient implements Client {
}
catch
(
SQLException
e1
)
{
}
}
healthChecker
.
unHealth
(
e
);
throw
new
JDBCClientException
(
e
.
getMessage
(),
e
);
}
...
...
@@ -161,4 +157,8 @@ public class JDBCHikariCPClient implements Client {
}
}
}
@Override
public
void
registerChecker
(
HealthChecker
healthChecker
)
{
this
.
healthChecker
.
register
(
healthChecker
);
}
}
oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/HealthChecker.java
0 → 100644
浏览文件 @
3fdfdf3b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.library.util
;
/**
* Health checker provides methods to register the health status.
*/
public
interface
HealthChecker
{
/**
* It's health.
*/
void
health
();
/**
* It's unHealth.
*
* @param t details of unhealthy status
*/
void
unHealth
(
Throwable
t
);
}
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
浏览文件 @
3fdfdf3b
...
...
@@ -74,6 +74,10 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNR
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO
;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO
;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.UITemplateManagementEsDAO
;
import
org.apache.skywalking.oap.server.telemetry.TelemetryModule
;
import
org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics
;
import
org.apache.skywalking.oap.server.telemetry.api.MetricsCreator
;
import
org.apache.skywalking.oap.server.telemetry.api.MetricsTag
;
/**
* The storage provider for ElasticSearch 6.
...
...
@@ -181,6 +185,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public
void
start
()
throws
ModuleStartException
{
MetricsCreator
metricCreator
=
getManager
().
find
(
TelemetryModule
.
NAME
).
provider
().
getService
(
MetricsCreator
.
class
);
HealthCheckMetrics
healthChecker
=
metricCreator
.
createHealthCheckerGauge
(
"storage_elasticsearch"
,
MetricsTag
.
EMPTY_KEY
,
MetricsTag
.
EMPTY_VALUE
);
elasticSearchClient
.
registerChecker
(
healthChecker
);
try
{
elasticSearchClient
.
connect
();
StorageEsInstaller
installer
=
new
StorageEsInstaller
(
elasticSearchClient
,
getManager
(),
config
);
...
...
oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
浏览文件 @
3fdfdf3b
...
...
@@ -70,6 +70,10 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.Meta
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.MetricsQueryEs7DAO
;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.ProfileThreadSnapshotQueryEs7DAO
;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.TraceQueryEs7DAO
;
import
org.apache.skywalking.oap.server.telemetry.TelemetryModule
;
import
org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics
;
import
org.apache.skywalking.oap.server.telemetry.api.MetricsCreator
;
import
org.apache.skywalking.oap.server.telemetry.api.MetricsTag
;
import
static
org
.
apache
.
skywalking
.
oap
.
server
.
storage
.
plugin
.
elasticsearch
.
StorageModuleElasticsearchProvider
.
indexNameConverters
;
...
...
@@ -186,6 +190,9 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
@Override
public
void
start
()
throws
ModuleStartException
{
MetricsCreator
metricCreator
=
getManager
().
find
(
TelemetryModule
.
NAME
).
provider
().
getService
(
MetricsCreator
.
class
);
HealthCheckMetrics
healthChecker
=
metricCreator
.
createHealthCheckerGauge
(
"storage_elasticsearch"
,
MetricsTag
.
EMPTY_KEY
,
MetricsTag
.
EMPTY_VALUE
);
elasticSearch7Client
.
registerChecker
(
healthChecker
);
try
{
elasticSearch7Client
.
connect
();
...
...
oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
浏览文件 @
3fdfdf3b
...
...
@@ -84,16 +84,21 @@ public class ElasticSearch7Client extends ElasticSearchClient {
@Override
public
void
connect
()
throws
IOException
,
KeyStoreException
,
NoSuchAlgorithmException
,
KeyManagementException
,
CertificateException
{
if
(
client
!=
null
)
{
try
{
client
.
close
();
}
catch
(
Throwable
t
)
{
log
.
error
(
"ElasticSearch7 client reconnection fails based on new config"
,
t
);
connectLock
.
lock
();
try
{
if
(
client
!=
null
)
{
try
{
client
.
close
();
}
catch
(
Throwable
t
)
{
log
.
error
(
"ElasticSearch7 client reconnection fails based on new config"
,
t
);
}
}
List
<
HttpHost
>
hosts
=
parseClusterNodes
(
protocol
,
clusterNodes
);
client
=
createClient
(
hosts
);
client
.
ping
(
RequestOptions
.
DEFAULT
);
}
finally
{
connectLock
.
unlock
();
}
List
<
HttpHost
>
hosts
=
parseClusterNodes
(
protocol
,
clusterNodes
);
client
=
createClient
(
hosts
);
client
.
ping
(
RequestOptions
.
DEFAULT
);
}
public
boolean
createIndex
(
String
indexName
)
throws
IOException
{
...
...
@@ -124,7 +129,14 @@ public class ElasticSearch7Client extends ElasticSearchClient {
aliases
=
formatIndexName
(
aliases
);
GetAliasesRequest
getAliasesRequest
=
new
GetAliasesRequest
(
aliases
);
GetAliasesResponse
alias
=
client
.
indices
().
getAlias
(
getAliasesRequest
,
RequestOptions
.
DEFAULT
);
GetAliasesResponse
alias
;
try
{
alias
=
client
.
indices
().
getAlias
(
getAliasesRequest
,
RequestOptions
.
DEFAULT
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
return
new
ArrayList
<>(
alias
.
getAliases
().
keySet
());
}
...
...
@@ -184,13 +196,28 @@ public class ElasticSearch7Client extends ElasticSearchClient {
indexName
=
formatIndexName
(
indexName
);
SearchRequest
searchRequest
=
new
SearchRequest
(
indexName
);
searchRequest
.
source
(
searchSourceBuilder
);
return
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
);
try
{
SearchResponse
response
=
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
);
healthChecker
.
health
();
return
response
;
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
handleIOPoolStopped
(
t
);
throw
t
;
}
}
public
GetResponse
get
(
String
indexName
,
String
id
)
throws
IOException
{
indexName
=
formatIndexName
(
indexName
);
GetRequest
request
=
new
GetRequest
(
indexName
,
id
);
return
client
.
get
(
request
,
RequestOptions
.
DEFAULT
);
try
{
GetResponse
response
=
client
.
get
(
request
,
RequestOptions
.
DEFAULT
);
healthChecker
.
health
();
return
response
;
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
SearchResponse
ids
(
String
indexName
,
String
[]
ids
)
throws
IOException
{
...
...
@@ -198,30 +225,39 @@ public class ElasticSearch7Client extends ElasticSearchClient {
SearchRequest
searchRequest
=
new
SearchRequest
(
indexName
);
searchRequest
.
source
().
query
(
QueryBuilders
.
idsQuery
().
addIds
(
ids
)).
size
(
ids
.
length
);
return
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
);
try
{
SearchResponse
response
=
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
);
healthChecker
.
health
();
return
response
;
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
void
forceInsert
(
String
indexName
,
String
id
,
XContentBuilder
source
)
throws
IOException
{
IndexRequest
request
=
(
IndexRequest
)
prepareInsert
(
indexName
,
id
,
source
);
request
.
setRefreshPolicy
(
WriteRequest
.
RefreshPolicy
.
IMMEDIATE
);
client
.
index
(
request
,
RequestOptions
.
DEFAULT
);
}
public
void
forceUpdate
(
String
indexName
,
String
id
,
XContentBuilder
source
,
long
seqNo
,
long
primaryTerm
)
throws
IOException
{
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
request
=
(
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
)
prepareUpdate
(
indexName
,
id
,
source
);
request
.
setIfSeqNo
(
seqNo
);
request
.
setIfPrimaryTerm
(
primaryTerm
);
request
.
setRefreshPolicy
(
WriteRequest
.
RefreshPolicy
.
IMMEDIATE
);
client
.
update
(
request
,
RequestOptions
.
DEFAULT
);
try
{
client
.
index
(
request
,
RequestOptions
.
DEFAULT
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
void
forceUpdate
(
String
indexName
,
String
id
,
XContentBuilder
source
)
throws
IOException
{
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
request
=
(
org
.
elasticsearch
.
action
.
update
.
UpdateRequest
)
prepareUpdate
(
indexName
,
id
,
source
);
request
.
setRefreshPolicy
(
WriteRequest
.
RefreshPolicy
.
IMMEDIATE
);
client
.
update
(
request
,
RequestOptions
.
DEFAULT
);
try
{
client
.
update
(
request
,
RequestOptions
.
DEFAULT
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
throw
t
;
}
}
public
InsertRequest
prepareInsert
(
String
indexName
,
String
id
,
XContentBuilder
source
)
{
...
...
@@ -256,8 +292,9 @@ public class ElasticSearch7Client extends ElasticSearchClient {
int
size
=
request
.
requests
().
size
();
BulkResponse
responses
=
client
.
bulk
(
request
,
RequestOptions
.
DEFAULT
);
log
.
info
(
"Synchronous bulk took time: {} millis, size: {}"
,
responses
.
getTook
().
getMillis
(),
size
);
}
catch
(
IOException
e
)
{
log
.
error
(
e
.
getMessage
(),
e
);
healthChecker
.
health
();
}
catch
(
Throwable
t
)
{
healthChecker
.
unHealth
(
t
);
}
}
...
...
oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
浏览文件 @
3fdfdf3b
...
...
@@ -64,7 +64,7 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQue
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO
;
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2UITemplateManagementDAO
;
import
org.apache.skywalking.oap.server.telemetry.TelemetryModule
;
import
org.apache.skywalking.oap.server.telemetry.api.
Gauge
Metrics
;
import
org.apache.skywalking.oap.server.telemetry.api.
HealthCheck
Metrics
;
import
org.apache.skywalking.oap.server.telemetry.api.MetricsCreator
;
import
org.apache.skywalking.oap.server.telemetry.api.MetricsTag
;
...
...
@@ -136,8 +136,8 @@ public class H2StorageProvider extends ModuleProvider {
@Override
public
void
start
()
throws
ServiceNotProvidedException
,
ModuleStartException
{
MetricsCreator
metricCreator
=
getManager
().
find
(
TelemetryModule
.
NAME
).
provider
().
getService
(
MetricsCreator
.
class
);
Gauge
Metrics
healthChecker
=
metricCreator
.
createHealthCheckerGauge
(
"storage_h2"
,
MetricsTag
.
EMPTY_KEY
,
MetricsTag
.
EMPTY_VALUE
);
h
ealthChecker
.
setValue
(
1
);
HealthCheck
Metrics
healthChecker
=
metricCreator
.
createHealthCheckerGauge
(
"storage_h2"
,
MetricsTag
.
EMPTY_KEY
,
MetricsTag
.
EMPTY_VALUE
);
h
2Client
.
registerChecker
(
healthChecker
);
try
{
h2Client
.
connect
();
...
...
@@ -146,13 +146,6 @@ public class H2StorageProvider extends ModuleProvider {
}
catch
(
StorageException
e
)
{
throw
new
ModuleStartException
(
e
.
getMessage
(),
e
);
}
h2Client
.
setHealthCheckListener
(
isHealthy
->
{
if
(
isHealthy
)
{
healthChecker
.
setValue
(
0
);
}
else
{
healthChecker
.
setValue
(
1
);
}
});
}
@Override
...
...
oap-server/server-telemetry/telemetry-api/src/main/java/org/apache/skywalking/oap/server/telemetry/api/HealthCheckMetrics.java
0 → 100644
浏览文件 @
3fdfdf3b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.oap.server.telemetry.api
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.skywalking.oap.server.library.util.HealthChecker
;
/**
* HealthCheckMetrics intends to record health status.
*/
@Slf4j
public
class
HealthCheckMetrics
implements
HealthChecker
{
private
final
GaugeMetrics
metrics
;
public
HealthCheckMetrics
(
GaugeMetrics
metrics
)
{
this
.
metrics
=
metrics
;
// The initial status is unhealthy with -1 code.
metrics
.
setValue
(-
1
);
}
public
void
health
()
{
metrics
.
setValue
(
0
);
}
public
void
unHealth
(
Throwable
t
)
{
log
.
error
(
"Health check fails"
,
t
);
metrics
.
setValue
(
1
);
}
}
oap-server/server-telemetry/telemetry-api/src/main/java/org/apache/skywalking/oap/server/telemetry/api/MetricsCreator.java
浏览文件 @
3fdfdf3b
...
...
@@ -50,11 +50,11 @@ public interface MetricsCreator extends Service {
/**
* Create a Health Check gauge.
*/
default
Gauge
Metrics
createHealthCheckerGauge
(
String
name
,
MetricsTag
.
Keys
tagKeys
,
MetricsTag
.
Values
tagValues
)
{
default
HealthCheck
Metrics
createHealthCheckerGauge
(
String
name
,
MetricsTag
.
Keys
tagKeys
,
MetricsTag
.
Values
tagValues
)
{
Preconditions
.
checkArgument
(!
Strings
.
isNullOrEmpty
(
name
),
"Require non-null or empty metric name"
);
return
createGauge
(
Strings
.
lenientFormat
(
"%s%s"
,
HEALTH_METRIC_PREFIX
,
name
),
return
new
HealthCheckMetrics
(
createGauge
(
Strings
.
lenientFormat
(
"%s%s"
,
HEALTH_METRIC_PREFIX
,
name
),
Strings
.
lenientFormat
(
"%s health check"
,
name
),
tagKeys
,
tagValues
);
tagKeys
,
tagValues
)
)
;
}
/**
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录