未验证 提交 c45bb361 编写于 作者: H haoyann 提交者: GitHub

Storage plugin supports PostgreSQL (#6345)

上级 6c6a8a71
...@@ -31,7 +31,7 @@ jobs: ...@@ -31,7 +31,7 @@ jobs:
timeout-minutes: 90 timeout-minutes: 90
strategy: strategy:
matrix: matrix:
storage: ['mysql', 'es6', 'es7.0', 'es7.10', 'influxdb', 'tidb'] storage: ['mysql', 'es6', 'es7.0', 'es7.10', 'influxdb', 'tidb', 'postgresql']
env: env:
SW_STORAGE: ${{ matrix.storage }} SW_STORAGE: ${{ matrix.storage }}
steps: steps:
......
...@@ -34,7 +34,7 @@ jobs: ...@@ -34,7 +34,7 @@ jobs:
timeout-minutes: 90 timeout-minutes: 90
strategy: strategy:
matrix: matrix:
storage: ['es6', 'es7', 'influxdb', 'tidb'] storage: ['es6', 'es7', 'influxdb', 'tidb', 'postgresql']
env: env:
SW_STORAGE: ${{ matrix.storage }} SW_STORAGE: ${{ matrix.storage }}
steps: steps:
......
...@@ -22,6 +22,7 @@ Release Notes. ...@@ -22,6 +22,7 @@ Release Notes.
* Support Zabbix protocol to receive agent metrics. * Support Zabbix protocol to receive agent metrics.
* Update the Apdex metric combine calculator. * Update the Apdex metric combine calculator.
* Enhance `MeterSystem` to allow creating metrics with same `metricName` / `function` / `scope`. * Enhance `MeterSystem` to allow creating metrics with same `metricName` / `function` / `scope`.
* Storage plugin supports postgresql.
#### UI #### UI
* Update selector scroller to show in all pages. * Update selector scroller to show in all pages.
......
...@@ -392,6 +392,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. ...@@ -392,6 +392,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
proto files from gogo: https://github.com/gogo/protobuf BSD-2 proto files from gogo: https://github.com/gogo/protobuf BSD-2
proto files from grpc-gateway, https://github.com/grpc-ecosystem/grpc-gateway/tree/master/protoc-gen-swagger/options BSD-3 proto files from grpc-gateway, https://github.com/grpc-ecosystem/grpc-gateway/tree/master/protoc-gen-swagger/options BSD-3
zstd-jni 1.4.3-1: https://github.com/luben/zstd-jni, BSD-3-Clause zstd-jni 1.4.3-1: https://github.com/luben/zstd-jni, BSD-3-Clause
postgresql 42.2.18: https://jdbc.postgresql.org/about/license.html, BSD-2-Clause
======================================================================== ========================================================================
MPL 2.0 licenses MPL 2.0 licenses
......
Copyright (c) 1997, PostgreSQL Global Development Group
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
...@@ -13,6 +13,7 @@ Native supported storage ...@@ -13,6 +13,7 @@ Native supported storage
- MySQL - MySQL
- TiDB - TiDB
- InfluxDB - InfluxDB
- PostgreSQL
Redistribution version with supported storage. Redistribution version with supported storage.
- ElasticSearch 5 - ElasticSearch 5
...@@ -265,6 +266,29 @@ storage: ...@@ -265,6 +266,29 @@ storage:
``` ```
All connection related settings including link url, username and password are in `application.yml`. The Metadata storage provider settings can refer to the configuration of **H2/MySQL** above. All connection related settings including link url, username and password are in `application.yml`. The Metadata storage provider settings can refer to the configuration of **H2/MySQL** above.
## PostgreSQL
PostgreSQL jdbc driver uses version 42.2.18, it supports PostgreSQL 8.2 or newer.
Active PostgreSQL as storage, set storage provider to **postgresql**.
```yaml
storage:
selector: ${SW_STORAGE:postgresql}
postgresql:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:postgresql://localhost:5432/skywalking"}
dataSource.user: ${SW_DATA_SOURCE_USER:postgres}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:123456}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
dataSource.prepStmtCacheSize: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_SIZE:250}
dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
```
All connection related settings including link url, username and password are in `application.yml`.
Here are some of the settings, please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for all the settings.
## ElasticSearch 5 ## ElasticSearch 5
ElasticSearch 5 is incompatible with ElasticSearch 6 Java client jar, so it could not be included in native distribution. ElasticSearch 5 is incompatible with ElasticSearch 6 Java client jar, so it could not be included in native distribution.
[OpenSkyWalking/SkyWalking-With-Es5x-Storage](https://github.com/OpenSkywalking/SkyWalking-With-Es5x-Storage) repo includes the distribution version. [OpenSkyWalking/SkyWalking-With-Es5x-Storage](https://github.com/OpenSkywalking/SkyWalking-With-Es5x-Storage) repo includes the distribution version.
......
...@@ -132,6 +132,11 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode ...@@ -132,6 +132,11 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 | | - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the MySQL, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 | | - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the MySQL, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 | | - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - |postgresql| - | PostgreSQL storage. | - | - |
| - | - | properties | Hikari connection pool configurations | - | Listed in the `application.yaml`. |
| - | - | metadataQueryMaxSize | The max size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities, such as trace segment, include the logic column with multiple values. In the PostgreSQL, we use multiple physical columns to host the values, such as, Change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, it includes multiple spans with multiple tags. Different spans could have same tag keys, such as multiple HTTP exit spans all have their own `http.method` tag. This configuration set the limitation of max num of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - |influxdb| - | InfluxDB storage. |- | - | | - |influxdb| - | InfluxDB storage. |- | - |
| - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086| | - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086|
| - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root| | - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root|
......
...@@ -101,6 +101,7 @@ ...@@ -101,6 +101,7 @@
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version> <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
<commons-beanutils.version>1.9.4</commons-beanutils.version> <commons-beanutils.version>1.9.4</commons-beanutils.version>
<flatbuffers-java.version>1.12.0</flatbuffers-java.version> <flatbuffers-java.version>1.12.0</flatbuffers-java.version>
<postgresql.version>42.2.18</postgresql.version>
</properties> </properties>
<dependencies> <dependencies>
...@@ -561,6 +562,12 @@ ...@@ -561,6 +562,12 @@
<artifactId>flatbuffers-java</artifactId> <artifactId>flatbuffers-java</artifactId>
<version>${flatbuffers-java.version}</version> <version>${flatbuffers-java.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>
...@@ -201,6 +201,18 @@ storage: ...@@ -201,6 +201,18 @@ storage:
batchEnabled: ${SW_STORAGE_INFLUXDB_BATCH_ENABLED:true} batchEnabled: ${SW_STORAGE_INFLUXDB_BATCH_ENABLED:true}
fetchTaskLogMaxSize: ${SW_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE:5000} # the max number of fetch task log in a request fetchTaskLogMaxSize: ${SW_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE:5000} # the max number of fetch task log in a request
connectionResponseFormat: ${SW_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT:MSGPACK} # the response format of connection to influxDB, cannot be anything but MSGPACK or JSON. connectionResponseFormat: ${SW_STORAGE_INFLUXDB_CONNECTION_RESPONSE_FORMAT:MSGPACK} # the response format of connection to influxDB, cannot be anything but MSGPACK or JSON.
postgresql:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:postgresql://localhost:5432/skywalking"}
dataSource.user: ${SW_DATA_SOURCE_USER:postgres}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:123456}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
dataSource.prepStmtCacheSize: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_SIZE:250}
dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
agent-analyzer: agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default} selector: ${SW_AGENT_ANALYZER:default}
......
...@@ -48,6 +48,10 @@ ...@@ -48,6 +48,10 @@
<!-- <artifactId>mysql-connector-java</artifactId>--> <!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- <version>8.0.13</version>--> <!-- <version>8.0.13</version>-->
<!-- </dependency>--> <!-- </dependency>-->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -49,15 +49,8 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO { ...@@ -49,15 +49,8 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO {
final String valueColumnName, final String valueColumnName,
final Duration duration, final Duration duration,
List<KeyValue> additionalConditions) throws IOException { List<KeyValue> additionalConditions) throws IOException {
StringBuilder sql = new StringBuilder();
List<Object> conditions = new ArrayList<>(10); List<Object> conditions = new ArrayList<>(10);
sql.append("select * from (select avg(") StringBuilder sql = buildMetricsValueSql(valueColumnName, metrics.getName());
.append(valueColumnName)
.append(") value,")
.append(Metrics.ENTITY_ID)
.append(" from ")
.append(metrics.getName())
.append(" where ");
sql.append(Metrics.TIME_BUCKET).append(" >= ? and ").append(Metrics.TIME_BUCKET).append(" <= ?"); sql.append(Metrics.TIME_BUCKET).append(" >= ? and ").append(Metrics.TIME_BUCKET).append(" <= ?");
conditions.add(duration.getStartTimeBucket()); conditions.add(duration.getStartTimeBucket());
conditions.add(duration.getEndTimeBucket()); conditions.add(duration.getEndTimeBucket());
...@@ -87,4 +80,16 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO { ...@@ -87,4 +80,16 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO {
} }
return topNEntities; return topNEntities;
} }
protected StringBuilder buildMetricsValueSql(String valueColumnName, String metricsName) {
StringBuilder sql = new StringBuilder();
sql.append("select * from (select avg(")
.append(valueColumnName)
.append(") value,")
.append(Metrics.ENTITY_ID)
.append(" from ")
.append(metricsName)
.append(" where ");
return sql;
}
} }
...@@ -65,8 +65,7 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO ...@@ -65,8 +65,7 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
default: default:
op = "sum"; op = "sum";
} }
StringBuilder sql = new StringBuilder( StringBuilder sql = buildMetricsValueSql(op, valueColumnName, condition.getName());
"select " + Metrics.ENTITY_ID + " id, " + op + "(" + valueColumnName + ") value from " + condition.getName() + " where ");
final String entityId = condition.getEntity().buildId(); final String entityId = condition.getEntity().buildId();
List<Object> parameters = new ArrayList(); List<Object> parameters = new ArrayList();
if (entityId != null) { if (entityId != null) {
...@@ -93,6 +92,11 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO ...@@ -93,6 +92,11 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
return defaultValue; return defaultValue;
} }
protected StringBuilder buildMetricsValueSql(String op, String valueColumnName, String conditionName) {
return new StringBuilder(
"select " + Metrics.ENTITY_ID + " id, " + op + "(" + valueColumnName + ") value from " + conditionName + " where ");
}
@Override @Override
public MetricsValues readMetricsValues(final MetricsCondition condition, public MetricsValues readMetricsValues(final MetricsCondition condition,
final String valueColumnName, final String valueColumnName,
......
...@@ -69,7 +69,7 @@ public class MySQLAlarmQueryDAO implements IAlarmQueryDAO { ...@@ -69,7 +69,7 @@ public class MySQLAlarmQueryDAO implements IAlarmQueryDAO {
Alarms alarms = new Alarms(); Alarms alarms = new Alarms();
try (Connection connection = client.getConnection()) { try (Connection connection = client.getConnection()) {
try (ResultSet resultSet = client.executeQuery(connection, "select count(1) total " + sql.toString(), parameters try (ResultSet resultSet = client.executeQuery(connection, buildCountStatement(sql.toString()), parameters
.toArray(new Object[0]))) { .toArray(new Object[0]))) {
while (resultSet.next()) { while (resultSet.next()) {
alarms.setTotal(resultSet.getInt("total")); alarms.setTotal(resultSet.getInt("total"));
...@@ -97,7 +97,11 @@ public class MySQLAlarmQueryDAO implements IAlarmQueryDAO { ...@@ -97,7 +97,11 @@ public class MySQLAlarmQueryDAO implements IAlarmQueryDAO {
return alarms; return alarms;
} }
private void buildLimit(StringBuilder sql, int from, int limit) { protected void buildLimit(StringBuilder sql, int from, int limit) {
sql.append(" LIMIT ").append(from).append(", ").append(limit); sql.append(" LIMIT ").append(from).append(", ").append(limit);
} }
protected String buildCountStatement(String sql) {
return "select count(1) total " + sql;
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLStorageConfig;
@Setter
@Getter
public class PostgreSQLStorageConfig extends MySQLStorageConfig {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EventQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2UITemplateManagementDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLTableInstaller;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao.PostgreSQLAggregationQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao.PostgreSQLAlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao.PostgreSQLBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao.PostgreSQLLogQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao.PostgreSQLMetricsQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao.PostgreSQLTraceQueryDAO;
/**
* PostgreSQL storage enhanced and came from MySQLStorageProvider to support PostgreSQL.
*/
@Slf4j
public class PostgreSQLStorageProvider extends ModuleProvider {
private PostgreSQLStorageConfig config;
private JDBCHikariCPClient postgresqlClient;
public PostgreSQLStorageProvider() {
config = new PostgreSQLStorageConfig();
}
@Override
public String name() {
return "postgresql";
}
@Override
public Class<? extends ModuleDefine> module() {
return StorageModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
postgresqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(postgresqlClient));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
getManager(), postgresqlClient, config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag())
);
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new H2NetworkAddressAliasDAO(postgresqlClient));
this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(postgresqlClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new PostgreSQLMetricsQueryDAO(postgresqlClient));
this.registerServiceImplementation(
ITraceQueryDAO.class,
new PostgreSQLTraceQueryDAO(
getManager(),
postgresqlClient,
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
)
);
this.registerServiceImplementation(IBrowserLogQueryDAO.class, new PostgreSQLBrowserLogQueryDAO(postgresqlClient));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new H2MetadataQueryDAO(postgresqlClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new PostgreSQLAggregationQueryDAO(postgresqlClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new PostgreSQLAlarmQueryDAO(postgresqlClient));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(postgresqlClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new H2TopNRecordsQueryDAO(postgresqlClient));
this.registerServiceImplementation(
ILogQueryDAO.class,
new PostgreSQLLogQueryDAO(
postgresqlClient,
getManager(),
config.getMaxSizeOfArrayColumn(),
config.getNumOfSearchableValuesPerTag()
)
);
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new H2ProfileTaskQueryDAO(postgresqlClient));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new H2ProfileTaskLogQueryDAO(postgresqlClient));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new H2ProfileThreadSnapshotQueryDAO(postgresqlClient));
this.registerServiceImplementation(UITemplateManagementDAO.class, new H2UITemplateManagementDAO(postgresqlClient));
this.registerServiceImplementation(IEventQueryDAO.class, new H2EventQueryDAO(postgresqlClient));
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final ConfigService configService = getManager().find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
final int numOfSearchableTags = configService.getSearchableTracesTags().split(Const.COMMA).length;
if (numOfSearchableTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableTracesTags[" + numOfSearchableTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
final int numOfSearchableLogsTags = configService.getSearchableLogsTags().split(Const.COMMA).length;
if (numOfSearchableLogsTags * config.getNumOfSearchableValuesPerTag() > config.getMaxSizeOfArrayColumn()) {
throw new ModuleStartException("Size of searchableLogsTags[" + numOfSearchableLogsTags
+ "] * numOfSearchableValuesPerTag[" + config.getNumOfSearchableValuesPerTag()
+ "] > maxSizeOfArrayColumn[" + config.getMaxSizeOfArrayColumn()
+ "]. Potential out of bound in the runtime.");
}
try {
postgresqlClient.connect();
MySQLTableInstaller installer = new PostgreSQLTableInstaller(
postgresqlClient, getManager(), config.getMaxSizeOfArrayColumn(), config.getNumOfSearchableValuesPerTag()
);
getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql;
import com.google.gson.JsonObject;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataComplexObject;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLTableInstaller;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
public class PostgreSQLTableInstaller extends MySQLTableInstaller {
public PostgreSQLTableInstaller(Client client, ModuleManager moduleManager, int maxSizeOfArrayColumn,
int numOfSearchableValuesPerTag) {
super(client, moduleManager, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
}
@Override
protected String transform(ModelColumn column, Class<?> type, Type genericType) {
final String storageName = column.getColumnName().getStorageName();
if (Integer.class.equals(type) || int.class.equals(type) || NodeType.class.equals(type)) {
return storageName + " INT";
} else if (Long.class.equals(type) || long.class.equals(type)) {
return storageName + " BIGINT";
} else if (Double.class.equals(type) || double.class.equals(type)) {
return storageName + " DOUBLE PRECISION";
} else if (String.class.equals(type)) {
return storageName + " VARCHAR(" + column.getLength() + ")";
} else if (StorageDataComplexObject.class.isAssignableFrom(type)) {
return storageName + " VARCHAR(20000)";
} else if (byte[].class.equals(type)) {
return storageName + " TEXT";
} else if (JsonObject.class.equals(type)) {
return storageName + " VARCHAR(" + column.getLength() + ")";
} else if (List.class.isAssignableFrom(type)) {
final Type elementType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
String oneColumnType = transform(column, (Class<?>) elementType, elementType);
// Remove the storageName as prefix
oneColumnType = oneColumnType.substring(storageName.length());
StringBuilder columns = new StringBuilder();
for (int i = 0; i < maxSizeOfArrayColumn; i++) {
columns.append(storageName).append("_").append(i).append(oneColumnType)
.append(i == maxSizeOfArrayColumn - 1 ? "" : ",");
}
return columns.toString();
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
}
@Override
protected String getColumn(final ModelColumn column) {
final String storageName = column.getColumnName().getStorageName();
final Class<?> type = column.getType();
if (StorageDataComplexObject.class.isAssignableFrom(type)) {
return storageName + " TEXT";
} else if (String.class.equals(type)) {
if (column.getLength() > 16383) {
return storageName + " TEXT";
} else {
return storageName + " VARCHAR(" + column.getLength() + ")";
}
}
return super.getColumn(column);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
public class PostgreSQLAggregationQueryDAO extends H2AggregationQueryDAO {
public PostgreSQLAggregationQueryDAO(JDBCHikariCPClient h2Client) {
super(h2Client);
}
@Override
protected StringBuilder buildMetricsValueSql(String valueColumnName, String metricsName) {
StringBuilder sql = new StringBuilder();
sql.append("select * from (select avg(")
.append(valueColumnName)
.append(") as value,")
.append(Metrics.ENTITY_ID)
.append(" from ")
.append(metricsName)
.append(" where ");
return sql;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLAlarmQueryDAO;
public class PostgreSQLAlarmQueryDAO extends MySQLAlarmQueryDAO {
public PostgreSQLAlarmQueryDAO(JDBCHikariCPClient client) {
super(client);
}
@Override
protected void buildLimit(StringBuilder sql, int from, int limit) {
sql.append(" LIMIT ").append(limit);
sql.append(" OFFSET ").append(from);
}
@Override
protected String buildCountStatement(String sql) {
return "select count(*) total from (select 1 " + sql + " ) tempTable ";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BrowserLogQueryDAO;
public class PostgreSQLBrowserLogQueryDAO extends H2BrowserLogQueryDAO {
public PostgreSQLBrowserLogQueryDAO(JDBCHikariCPClient h2Client) {
super(h2Client);
}
@Override
protected String buildCountStatement(String sql) {
return "select count(*) total from (select 1 " + sql + " ) tempTable ";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2LogQueryDAO;
public class PostgreSQLLogQueryDAO extends H2LogQueryDAO {
public PostgreSQLLogQueryDAO(JDBCHikariCPClient h2Client, ModuleManager manager, int maxSizeOfArrayColumn,
int numOfSearchValuesPerTag) {
super(h2Client, manager, maxSizeOfArrayColumn, numOfSearchValuesPerTag);
}
@Override
protected String buildCountStatement(String sql) {
return "select count(*) total from (select 1 " + sql + " ) temp_table ";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricsQueryDAO;
public class PostgreSQLMetricsQueryDAO extends H2MetricsQueryDAO {
public PostgreSQLMetricsQueryDAO(JDBCHikariCPClient h2Client) {
super(h2Client);
}
@Override
protected StringBuilder buildMetricsValueSql(String op, String valueColumnName, String conditionName) {
return new StringBuilder(
"select " + Metrics.ENTITY_ID + " id, " + op + "(" + valueColumnName + ") as value from " + conditionName + " where ");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.dao;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
public class PostgreSQLTraceQueryDAO extends H2TraceQueryDAO {
public PostgreSQLTraceQueryDAO(ModuleManager manager, JDBCHikariCPClient h2Client, int maxSizeOfArrayColumn,
int numOfSearchableValuesPerTag) {
super(manager, h2Client, maxSizeOfArrayColumn, numOfSearchableValuesPerTag);
}
@Override
protected String buildCountStatement(String sql) {
return "select count(*) total from (select 1 " + sql + " ) temp_table ";
}
}
...@@ -17,4 +17,5 @@ ...@@ -17,4 +17,5 @@
# #
org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider
org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLStorageProvider org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLStorageProvider
\ No newline at end of file org.apache.skywalking.oap.server.storage.plugin.jdbc.postgresql.PostgreSQLStorageProvider
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
postgres:
image: postgres:13
networks:
- e2e
expose:
- 5432
environment:
- POSTGRES_PASSWORD=123456
- POSTGRES_DB=skywalking
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5432"]
interval: 5s
timeout: 60s
retries: 120
oap:
extends:
file: ../base-compose.yml
service: oap
environment:
SW_STORAGE: postgresql
SW_PROMETHEUS_FETCHER: "default"
SW_JDBC_URL: "jdbc:postgresql://postgres:5432/skywalking"
SW_TELEMETRY: prometheus
depends_on:
postgres:
condition: service_healthy
networks:
e2e:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
postgres:
image: postgres:13
networks:
- e2e
expose:
- 5432
environment:
- POSTGRES_PASSWORD=123456
- POSTGRES_DB=skywalking
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5432"]
interval: 5s
timeout: 60s
retries: 120
oap:
extends:
file: ../base-compose.yml
service: oap
environment:
SW_STORAGE: postgresql
SW_PROMETHEUS_FETCHER: "default"
SW_JDBC_URL: "jdbc:postgresql://postgres:5432/skywalking"
SW_TELEMETRY: prometheus
depends_on:
postgres:
condition: service_healthy
networks:
e2e:
...@@ -173,3 +173,4 @@ snappy-java-1.1.7.3.jar ...@@ -173,3 +173,4 @@ snappy-java-1.1.7.3.jar
zstd-jni-1.4.3-1.jar zstd-jni-1.4.3-1.jar
mvel2-2.4.8.Final.jar mvel2-2.4.8.Final.jar
commons-beanutils-1.9.4.jar commons-beanutils-1.9.4.jar
postgresql-42.2.18.jar
\ No newline at end of file
...@@ -171,3 +171,4 @@ snappy-java-1.1.7.3.jar ...@@ -171,3 +171,4 @@ snappy-java-1.1.7.3.jar
zstd-jni-1.4.3-1.jar zstd-jni-1.4.3-1.jar
mvel2-2.4.8.Final.jar mvel2-2.4.8.Final.jar
commons-beanutils-1.9.4.jar commons-beanutils-1.9.4.jar
postgresql-42.2.18.jar
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册