未验证 提交 329f7e15 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support Downsampling Data Packing feature in ES storage implementation (#4364)

* Support day/hour/minute metrics merging into one index. Reduce the number of index 50%.
上级 5fed153e
......@@ -82,6 +82,7 @@ storage:
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......@@ -104,6 +105,7 @@ storage:
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
# Why metrics indexes in Hour and Day precisions stop update after upgrade to 7.x?
This is an expected case when 6.x->7.x upgrade.
Read [Downsampling Data Packing feature](../setup/backend/backend-storage.md#downsampling-data-packing)
of the ElasticSearch storage.
The users could simply delete all expired `*-day_xxxxx` and `*-hour_xxxxx`(`xxxxx` is a timestamp) indexes.
SkyWalking is using `metrics name-xxxxx` and `metrics name-month_xxxxx` indexes only.
\ No newline at end of file
......@@ -9,6 +9,7 @@ These are known and common FAQs. We welcome you to contribute yours.
* [Required items could not be found, when import project into Eclipse](Import-Project-Eclipse-RequireItems-Exception.md)
## Runtime
* [Why metrics indexes(ElasticSearch) in Hour and Day precisions stop update after upgrade to 7.x?](Hour-Day-Metrics-Stopping.md)
* [6.x version upgrade](v6-version-upgrade.md)
* [Why only traces in UI?](Why-have-traces-no-others.md)
* [Too many GRPC logs in the console](Too-many-gRPC-logs.md)
......
......@@ -47,6 +47,7 @@ storage:
# password: ${SW_ES_PASSWORD:""} # Password to be set when Http Basic authentication is enabled
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:""}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......@@ -67,6 +68,14 @@ storage:
advanced: ${SW_STORAGE_ES_ADVANCED:""}
```
### Downsampling Data Packing
Downsampling data packing(`storage/elasticsearch/enablePackedDownsampling`, default activated) is a new feature since 7.0.0.
Metrics data has 4 different precisions,based on `core/default/downsampling` configurations.
In previous(6.x), every precision of each metrics had one separated
index. After this is activated, metrics of day and hour precisions are merged into minute precision. The number of indexes
decreased, and cause less payload to the ElasticSearch server.
### ElasticSearch 6 With Https SSL Encrypting communications.
example:
......
......@@ -83,6 +83,7 @@ storage:
# #trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Those data TTL settings will override the same settings in core module.
......@@ -104,6 +105,7 @@ storage:
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
#trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
#trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
enablePackedDownsampling: ${SW_STORAGE_ENABLE_PACKED_DOWNSAMPLING:true} # Hour and Day metrics will be merged into minute index.
user: ${SW_ES_USER:""}
password: ${SW_ES_PASSWORD:""}
indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
......
......@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
......@@ -83,30 +84,34 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ElasticSearchClient connects to the ES server by using ES client APIs.
*/
@Slf4j
public class ElasticSearchClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
public static final String TYPE = "type";
protected final String clusterNodes;
protected final String protocol;
private final String trustStorePath;
private final String trustStorePass;
private final String namespace;
private final String user;
private final String password;
private final List<IndexNameConverter> indexNameConverters;
protected RestHighLevelClient client;
public ElasticSearchClient(String clusterNodes, String protocol, String trustStorePath, String trustStorePass,
String namespace, String user, String password) {
public ElasticSearchClient(String clusterNodes,
String protocol,
String trustStorePath,
String trustStorePass,
String user,
String password,
List<IndexNameConverter> indexNameConverters) {
this.clusterNodes = clusterNodes;
this.protocol = protocol;
this.namespace = namespace;
this.user = user;
this.password = password;
this.indexNameConverters = indexNameConverters;
this.trustStorePath = trustStorePath;
this.trustStorePass = trustStorePass;
}
......@@ -127,7 +132,9 @@ public class ElasticSearchClient implements Client {
if (StringUtil.isEmpty(trustStorePath)) {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
credentialsProvider));
} else {
KeyStore truststore = KeyStore.getInstance("jks");
try (InputStream is = Files.newInputStream(Paths.get(trustStorePath))) {
......@@ -136,8 +143,10 @@ public class ElasticSearchClient implements Client {
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null);
final SSLContext sslContext = sslBuilder.build();
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setSSLContext(sslContext));
.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
credentialsProvider)
.setSSLContext(sslContext));
}
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
......@@ -153,7 +162,7 @@ public class ElasticSearchClient implements Client {
public static List<HttpHost> parseClusterNodes(String protocol, String nodes) {
List<HttpHost> httpHosts = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes);
log.info("elasticsearch cluster nodes: {}", nodes);
List<String> nodesSplit = Splitter.on(",").omitEmptyStrings().splitToList(nodes);
for (String node : nodesSplit) {
......@@ -170,19 +179,19 @@ public class ElasticSearchClient implements Client {
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexResponse response = client.indices().create(request);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean createIndex(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
Gson gson = new Gson();
request.settings(gson.toJson(settings), XContentType.JSON);
request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON);
CreateIndexResponse response = client.indices().create(request);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
log.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
......@@ -193,7 +202,7 @@ public class ElasticSearchClient implements Client {
Gson gson = new Gson();
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
return new ArrayList<>(responseJson.keySet());
}
return Collections.emptyList();
......@@ -227,7 +236,7 @@ public class ElasticSearchClient implements Client {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
DeleteIndexResponse response;
response = client.indices().delete(request);
logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
log.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
......@@ -249,12 +258,13 @@ public class ElasticSearchClient implements Client {
} else if (statusCode == HttpStatus.SC_NOT_FOUND) {
return false;
} else {
throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
throw new IOException(
"The response status code of template exists request should be 200 or 404, but it is " + statusCode);
}
}
public boolean createTemplate(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
String[] patterns = new String[] {indexName + "-*"};
......@@ -271,7 +281,8 @@ public class ElasticSearchClient implements Client {
HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient()
.performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
.performRequest(
HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
}
......@@ -313,14 +324,16 @@ public class ElasticSearchClient implements Client {
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
indexName, id, source);
request.version(version);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
......@@ -341,8 +354,9 @@ public class ElasticSearchClient implements Client {
String jsonString = "{" + " \"query\": {" + " \"range\": {" + " \"" + timeBucketColumnName + "\": {" + " \"lte\": " + endTimeBucket + " }" + " }" + " }" + "}";
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient()
.performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
.performRequest(
HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
log.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
return response.getStatusLine().getStatusCode();
}
......@@ -353,9 +367,9 @@ public class ElasticSearchClient implements Client {
try {
int size = request.requests().size();
BulkResponse responses = client.bulk(request);
logger.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
} catch (IOException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
......@@ -375,31 +389,34 @@ public class ElasticSearchClient implements Client {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
log.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
log.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook()
.getMillis(), request
.requests()
.size());
log.info(
"Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook()
.getMillis(),
request
.requests()
.size()
);
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
log.error("Failed to execute bulk", failure);
}
};
}
public String formatIndexName(String indexName) {
if (StringUtil.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
for (final IndexNameConverter indexNameConverter : indexNameConverters) {
indexName = indexNameConverter.convert(indexName);
}
return indexName;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client.elasticsearch;
/**
* Implementation supports the ElasticSearch index name converting.
*/
public interface IndexNameConverter {
String convert(String name);
}
......@@ -22,6 +22,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -67,7 +68,9 @@ public class ITElasticSearchClient {
public void before() throws Exception {
final String esAddress = System.getProperty("elastic.search.address");
final String esProtocol = System.getProperty("elastic.search.protocol");
client = new ElasticSearchClient(esAddress, esProtocol, "", "", namespace, "test", "test");
client = new ElasticSearchClient(esAddress, esProtocol, "", "", "test", "test",
indexNameConverters(namespace)
);
client.connect();
}
......@@ -270,12 +273,36 @@ public class ITElasticSearchClient {
index.add(oldIndexName.substring(namespacePrefix.length()), entry.getValue());
index.remove(oldIndexName);
} else {
throw new RuntimeException("The indexName must contain the " + namespace + " prefix, but it is " + entry
.getKey());
throw new RuntimeException(
"The indexName must contain the " + namespace + " prefix, but it is " + entry
.getKey());
}
});
logger.info("UndoFormatIndexName after " + index.toString());
}
return index;
}
private static List<IndexNameConverter> indexNameConverters(String namespace) {
List<IndexNameConverter> converters = new ArrayList<>();
converters.add(new NamespaceConverter(namespace));
return converters;
}
private static class NamespaceConverter implements IndexNameConverter {
private final String namespace;
public NamespaceConverter(final String namespace) {
this.namespace = namespace;
}
@Override
public String convert(final String indexName) {
if (StringUtil.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
}
return indexName;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.mock;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.network.common.Commands;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.v2.TraceSegmentReportServiceGrpc;
public class AgentDataMock {
private static boolean IS_COMPLETED = false;
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext().build();
RegisterMock registerMock = new RegisterMock(channel);
StreamObserver<UpstreamSegment> streamObserver = createStreamObserver();
UniqueId.Builder globalTraceId = UniqueIdBuilder.INSTANCE.create();
long startTimestamp = System.currentTimeMillis();
//long startTimestamp = new DateTime().minusDays(2).getMillis();
// ServiceAMock
ServiceAMock serviceAMock = new ServiceAMock(registerMock);
serviceAMock.register();
// ServiceBMock
ServiceBMock serviceBMock = new ServiceBMock(registerMock);
serviceBMock.register();
// ServiceCMock
ServiceCMock serviceCMock = new ServiceCMock(registerMock);
serviceCMock.register();
UniqueId.Builder serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, true);
UniqueId.Builder serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, true);
UniqueId.Builder serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, true);
TimeUnit.SECONDS.sleep(10);
for (int i = 0; i < 500; i++) {
globalTraceId = UniqueIdBuilder.INSTANCE.create();
serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, true);
serviceBMock.mock(
streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, true);
serviceCMock.mock(
streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, true);
}
streamObserver.onCompleted();
while (!IS_COMPLETED) {
TimeUnit.MILLISECONDS.sleep(500);
}
}
private static StreamObserver<UpstreamSegment> createStreamObserver() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext().build();
TraceSegmentReportServiceGrpc.TraceSegmentReportServiceStub stub = TraceSegmentReportServiceGrpc.newStub(
channel);
return stub.collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands downstream) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
IS_COMPLETED = true;
}
});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.mock;
import io.grpc.ManagedChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.network.common.KeyIntValuePair;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.common.ServiceType;
import org.apache.skywalking.apm.network.register.v2.RegisterGrpc;
import org.apache.skywalking.apm.network.register.v2.Service;
import org.apache.skywalking.apm.network.register.v2.ServiceInstance;
import org.apache.skywalking.apm.network.register.v2.ServiceInstanceRegisterMapping;
import org.apache.skywalking.apm.network.register.v2.ServiceInstances;
import org.apache.skywalking.apm.network.register.v2.ServiceRegisterMapping;
import org.apache.skywalking.apm.network.register.v2.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class RegisterMock {
private static final Logger logger = LoggerFactory.getLogger(RegisterMock.class);
private final RegisterGrpc.RegisterBlockingStub registerStub;
RegisterMock(ManagedChannel channel) {
registerStub = RegisterGrpc.newBlockingStub(channel);
}
int registerService(String serviceName) throws InterruptedException {
Services.Builder services = Services.newBuilder();
services.addServices(Service
.newBuilder()
.setServiceName(serviceName)
.setType(ServiceType.normal))
.build();
ServiceRegisterMapping serviceRegisterMapping;
int serviceId = 0;
do {
serviceRegisterMapping = registerStub.doServiceRegister(services.build());
List<KeyIntValuePair> servicesList = serviceRegisterMapping.getServicesList();
if (servicesList.size() > 0) {
serviceId = servicesList.get(0).getValue();
logger.debug("service id: {}", serviceId);
}
TimeUnit.MILLISECONDS.sleep(20);
}
while (serviceId == 0);
return serviceId;
}
int registerServiceInstance(int serviceId, String agentName) throws InterruptedException {
ServiceInstances.Builder instances = ServiceInstances.newBuilder();
instances.addInstances(ServiceInstance.newBuilder()
.setServiceId(serviceId)
.setInstanceUUID(agentName)
.setTime(System.currentTimeMillis())
.addAllProperties(buildOSInfo())
);
ServiceInstanceRegisterMapping instanceMapping;
int instanceId = 0;
do {
instanceMapping = registerStub.doServiceInstanceRegister(instances.build());
List<KeyIntValuePair> serviceInstancesList = instanceMapping.getServiceInstancesList();
if (serviceInstancesList.size() > 0) {
instanceId = serviceInstancesList.get(0).getValue();
logger.debug("instance id: {}", instanceId);
}
TimeUnit.MILLISECONDS.sleep(20);
}
while (instanceId == 0);
return instanceId;
}
public static List<KeyStringValuePair> buildOSInfo() {
List<KeyStringValuePair> osInfo = new ArrayList<KeyStringValuePair>();
osInfo.add(KeyStringValuePair.newBuilder().setKey("os_name").setValue("osName").build());
osInfo.add(KeyStringValuePair.newBuilder().setKey("host_name").setValue("hostName").build());
osInfo.add(KeyStringValuePair.newBuilder().setKey("ipv4").setValue("ipv4").build());
osInfo.add(KeyStringValuePair.newBuilder().setKey("process_no").setValue("123").build());
osInfo.add(KeyStringValuePair.newBuilder().setKey("language").setValue("java").build());
return osInfo;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.mock;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
class ServiceAMock {
static String REST_ENDPOINT = "/dubbox-case/case/dubbox-rest";
static String DUBBO_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()";
static String DUBBO_ADDRESS = "DubboIPAddress:1000";
private final RegisterMock registerMock;
private static int SERVICE_ID;
static int SERVICE_INSTANCE_ID;
ServiceAMock(RegisterMock registerMock) {
this.registerMock = registerMock;
}
void register() throws InterruptedException {
SERVICE_ID = registerMock.registerService("dubbox-consumer");
SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysA");
}
void mock(StreamObserver<UpstreamSegment> streamObserver, UniqueId.Builder traceId,
UniqueId.Builder segmentId, long startTimestamp, boolean isPrepare) {
UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
upstreamSegment.addGlobalTraceIds(traceId);
upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, isPrepare));
streamObserver.onNext(upstreamSegment.build());
}
private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId, boolean isPrepare) {
SegmentObject.Builder segment = SegmentObject.newBuilder();
segment.setTraceSegmentId(segmentId);
segment.setServiceId(SERVICE_ID);
segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
segment.addSpans(createExitSpan(startTimestamp, isPrepare));
return segment.build().toByteString();
}
private SpanObjectV2.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
span.setSpanId(0);
span.setSpanType(SpanType.Entry);
span.setSpanLayer(SpanLayer.Http);
span.setParentSpanId(-1);
span.setStartTime(startTimestamp);
span.setEndTime(startTimestamp + 6000);
span.setComponentId(ComponentsDefine.TOMCAT.getId());
if (isPrepare) {
span.setOperationName(REST_ENDPOINT);
} else {
span.setOperationNameId(2);
}
span.setIsError(false);
return span;
}
private SpanObjectV2.Builder createLocalSpan(long startTimestamp, boolean isPrepare) {
SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
span.setSpanId(1);
span.setSpanType(SpanType.Local);
span.setParentSpanId(0);
span.setStartTime(startTimestamp + 100);
span.setEndTime(startTimestamp + 500);
span.setOperationName("org.apache.skywalking.Local.do");
span.setIsError(false);
return span;
}
private SpanObjectV2.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
span.setSpanId(2);
span.setSpanType(SpanType.Exit);
span.setSpanLayer(SpanLayer.RPCFramework);
span.setParentSpanId(1);
span.setStartTime(startTimestamp + 120);
span.setEndTime(startTimestamp + 5800);
span.setComponentId(ComponentsDefine.DUBBO.getId());
span.setOperationName(DUBBO_ENDPOINT);
if (isPrepare) {
span.setPeer(DUBBO_ADDRESS);
} else {
span.setPeerId(2);
}
span.setIsError(false);
return span;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.mock;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.RefType;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference;
import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
class ServiceBMock {
private final RegisterMock registerMock;
private static int SERVICE_ID;
static int SERVICE_INSTANCE_ID;
static String DUBBO_PROVIDER_ENDPOINT = "org.skywaking.apm.testcase.dubbo.services.GreetServiceImpl.doBusiness()";
static String ROCKET_MQ_ENDPOINT = "org.apache.skywalking.RocketMQ";
static String ROCKET_MQ_ADDRESS = "RocketMQAddress:2000";
ServiceBMock(RegisterMock registerMock) {
this.registerMock = registerMock;
}
void register() throws InterruptedException {
SERVICE_ID = registerMock.registerService("dubbox-provider");
SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysB");
}
void mock(StreamObserver<UpstreamSegment> streamObserver,
UniqueId.Builder traceId,
UniqueId.Builder segmentId,
UniqueId.Builder parentTraceSegmentId,
long startTimestamp,
boolean isPrepare) {
UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
upstreamSegment.addGlobalTraceIds(traceId);
upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
streamObserver.onNext(upstreamSegment.build());
}
private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
SegmentObject.Builder segment = SegmentObject.newBuilder();
segment.setTraceSegmentId(segmentId);
segment.setServiceId(SERVICE_ID);
segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
segment.addSpans(createExitSpan(startTimestamp, isPrepare));
segment.addSpans(createMQExitSpan(startTimestamp, isPrepare));
return segment.build().toByteString();
}
private SegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
SegmentReference.Builder reference = SegmentReference.newBuilder();
reference.setParentTraceSegmentId(parentTraceSegmentId);
reference.setParentServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
reference.setParentSpanId(2);
reference.setEntryServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
reference.setRefType(RefType.CrossProcess);
if (isPrepare) {
reference.setParentEndpoint(ServiceAMock.REST_ENDPOINT);
reference.setNetworkAddress(ServiceAMock.DUBBO_ADDRESS);
reference.setEntryEndpoint(ServiceAMock.REST_ENDPOINT);
} else {
reference.setParentEndpointId(2);
reference.setNetworkAddressId(2);
reference.setEntryEndpointId(2);
}
return reference;
}
private SpanObjectV2.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
span.setSpanId(0);
span.setSpanType(SpanType.Entry);
span.setSpanLayer(SpanLayer.RPCFramework);
span.setParentSpanId(-1);
span.setStartTime(startTimestamp + 500);
span.setEndTime(startTimestamp + 5000);
span.setComponentId(ComponentsDefine.DUBBO.getId());
span.setIsError(false);
span.addRefs(createReference(uniqueId, isPrepare));
if (isPrepare) {
span.setOperationName(ServiceBMock.DUBBO_PROVIDER_ENDPOINT);
} else {
span.setOperationNameId(4);
}
return span;
}
private SpanObjectV2.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
span.setSpanId(1);
span.setSpanType(SpanType.Exit);
span.setSpanLayer(SpanLayer.Database);
span.setParentSpanId(0);
span.setStartTime(startTimestamp + 550);
span.setEndTime(startTimestamp + 1500);
span.setComponentId(ComponentsDefine.MONGO_DRIVER.getId());
span.setIsError(true);
span.addTags(KeyStringValuePair.newBuilder()
.setKey("db.statement")
.setValue("select * from database where complex = 1;")
.build());
span.addTags(KeyStringValuePair.newBuilder().setKey("db.type").setValue("mongodb").build());
span.setOperationName(
"mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]");
if (isPrepare) {
span.setPeer("localhost:27017");
} else {
span.setPeerId(3);
}
return span;
}
private SpanObjectV2.Builder createMQExitSpan(long startTimestamp, boolean isPrepare) {
SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
span.setSpanId(2);
span.setSpanType(SpanType.Exit);
span.setSpanLayer(SpanLayer.MQ);
span.setParentSpanId(1);
span.setStartTime(startTimestamp + 1100);
span.setEndTime(startTimestamp + 1500);
span.setComponentId(ComponentsDefine.ROCKET_MQ_PRODUCER.getId());
span.setIsError(false);
span.setOperationName(ROCKET_MQ_ENDPOINT);
if (isPrepare) {
span.setPeer(ROCKET_MQ_ADDRESS);
} else {
span.setPeerId(4);
}
return span;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.mock;
import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.language.agent.RefType;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentReference;
import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
class ServiceCMock {
private final RegisterMock registerMock;
private static int SERVICE_ID;
private static int SERVICE_INSTANCE_ID;
ServiceCMock(RegisterMock registerMock) {
this.registerMock = registerMock;
}
void register() throws InterruptedException {
SERVICE_ID = registerMock.registerService("rocket-mq-consumer");
SERVICE_INSTANCE_ID = registerMock.registerServiceInstance(SERVICE_ID, "pengysC");
}
void mock(StreamObserver<UpstreamSegment> streamObserver,
UniqueId.Builder traceId,
UniqueId.Builder segmentId,
UniqueId.Builder parentTraceSegmentId,
long startTimestamp,
boolean isPrepare) {
UpstreamSegment.Builder upstreamSegment = UpstreamSegment.newBuilder();
upstreamSegment.addGlobalTraceIds(traceId);
upstreamSegment.setSegment(createSegment(startTimestamp, segmentId, parentTraceSegmentId, isPrepare));
streamObserver.onNext(upstreamSegment.build());
}
private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId,
UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
SegmentObject.Builder segment = SegmentObject.newBuilder();
segment.setTraceSegmentId(segmentId);
segment.setServiceInstanceId(SERVICE_INSTANCE_ID);
segment.setServiceId(SERVICE_ID);
segment.addSpans(createEntrySpan(startTimestamp, parentTraceSegmentId, isPrepare));
return segment.build().toByteString();
}
private SpanObjectV2.Builder createEntrySpan(long startTimestamp, UniqueId.Builder uniqueId, boolean isPrepare) {
SpanObjectV2.Builder span = SpanObjectV2.newBuilder();
span.setSpanId(0);
span.setSpanType(SpanType.Entry);
span.setSpanLayer(SpanLayer.MQ);
span.setParentSpanId(-1);
span.setStartTime(startTimestamp + 3000);
span.setEndTime(startTimestamp + 5000);
span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
span.setIsError(false);
span.addRefs(createReference(uniqueId, isPrepare));
if (isPrepare) {
span.setOperationName(ServiceBMock.ROCKET_MQ_ENDPOINT);
} else {
span.setOperationNameId(5);
}
return span;
}
private SegmentReference.Builder createReference(UniqueId.Builder parentTraceSegmentId, boolean isPrepare) {
SegmentReference.Builder reference = SegmentReference.newBuilder();
reference.setParentTraceSegmentId(parentTraceSegmentId);
reference.setParentServiceInstanceId(ServiceBMock.SERVICE_INSTANCE_ID);
reference.setParentSpanId(2);
reference.setEntryServiceInstanceId(ServiceAMock.SERVICE_INSTANCE_ID);
reference.setRefType(RefType.CrossProcess);
if (isPrepare) {
reference.setParentEndpoint(ServiceBMock.DUBBO_PROVIDER_ENDPOINT);
reference.setNetworkAddress(ServiceBMock.ROCKET_MQ_ADDRESS);
reference.setEntryEndpoint(ServiceAMock.REST_ENDPOINT);
} else {
reference.setParentEndpointId(8);
reference.setNetworkAddressId(4);
reference.setEntryEndpointId(2);
}
return reference;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.mock;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
public enum UniqueIdBuilder {
INSTANCE;
private AtomicLong idPart = new AtomicLong(1);
UniqueId.Builder create() {
UniqueId.Builder uniqueId = UniqueId.newBuilder();
uniqueId.addIdParts(idPart.getAndIncrement());
uniqueId.addIdParts(idPart.getAndIncrement());
uniqueId.addIdParts(idPart.getAndIncrement());
return uniqueId;
}
}
......@@ -55,6 +55,16 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Getter
@Setter
String trustStorePass;
/**
* If this is ON, downsampling indexes(hour and day precisions) merged into minute precision. In this case, only
* {@link #minuteMetricsDataTTL} works for minute, hour and day.
*
* @since 7.0.0 This is an enhancement. Reduce 50% of index number(remove day/hour index requirements) but keep the
* performance nearly same as before. Only one side-effect for 6.x storage is just day/hour indexes remain, users
* need to remove them manually.
*/
@Getter
private boolean enablePackedDownsampling = true;
@Setter
private int resultWindowMaxSize = 10000;
@Setter
......
......@@ -23,8 +23,14 @@ import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
......@@ -48,6 +54,7 @@ import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
......@@ -106,38 +113,54 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
elasticSearchClient = new ElasticSearchClient(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace(), config.isEnablePackedDownsampling())
);
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient, new ElasticsearchStorageTTL()));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearchClient,
new ElasticsearchStorageTTL()
));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient, config
.getResultWindowMaxSize()));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient, config
.getResultWindowMaxSize()));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient, config
.getResultWindowMaxSize()));
this.registerServiceImplementation(
IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient, config
.getResultWindowMaxSize()));
this.registerServiceImplementation(
IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheDAO(elasticSearchClient, config
.getResultWindowMaxSize()));
this.registerServiceImplementation(
IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEsDAO(elasticSearchClient));
this.registerServiceImplementation(
INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEsDAO(elasticSearchClient, config
.getResultWindowMaxSize()));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(
ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
}
@Override
......@@ -174,4 +197,61 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
configService.getDataTTLConfig().setDayMetricsDataTTL(config.getDayMetricsDataTTL());
configService.getDataTTLConfig().setMonthMetricsDataTTL(config.getMonthMetricsDataTTL());
}
public static List<IndexNameConverter> indexNameConverters(String namespace, boolean enablePackedDownsampling) {
List<IndexNameConverter> converters = new ArrayList<>();
if (enablePackedDownsampling) {
// Packed downsampling converter.
converters.add(new PackedDownsamplingConverter());
}
converters.add(new NamespaceConverter(namespace));
return converters;
}
private static class PackedDownsamplingConverter implements IndexNameConverter {
private final String[] removableSuffixes = new String[] {
Const.ID_SPLIT + Downsampling.Day.getName(),
Const.ID_SPLIT + Downsampling.Hour.getName()
};
private final Map<String, String> convertedIndexNames = new ConcurrentHashMap<>();
public PackedDownsamplingConverter() {
}
@Override
public String convert(final String indexName) {
String convertedName = convertedIndexNames.get(indexName);
if (convertedName != null) {
return convertedName;
}
convertedName = indexName;
for (final String removableSuffix : removableSuffixes) {
String mayReplaced = indexName.replaceAll(removableSuffix, "");
if (mayReplaced.length() != convertedName.length()) {
convertedName = mayReplaced;
break;
}
}
convertedIndexNames.put(indexName, convertedName);
return convertedName;
}
}
private static class NamespaceConverter implements IndexNameConverter {
private final String namespace;
public NamespaceConverter(final String namespace) {
this.namespace = namespace;
}
@Override
public String convert(final String indexName) {
if (StringUtil.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
}
return indexName;
}
}
}
......@@ -76,6 +76,8 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.Metr
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.ProfileThreadSnapshotQueryEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.TraceQueryEs7DAO;
import static org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchProvider.indexNameConverters;
public class StorageModuleElasticsearch7Provider extends ModuleProvider {
protected final StorageModuleElasticsearch7Config config;
......@@ -106,31 +108,68 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
elasticSearch7Client = new ElasticSearch7Client(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
elasticSearch7Client = new ElasticSearch7Client(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverters(config.getNameSpace(), config.isEnablePackedDownsampling())
);
this.registerServiceImplementation(
IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(),
config.getFlushInterval(), config.getConcurrentRequests()
));
this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client, new ElasticsearchStorageTTL()));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client,
new ElasticsearchStorageTTL()
));
this.registerServiceImplementation(
IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(
elasticSearch7Client,
config.getResultWindowMaxSize()
));
this.registerServiceImplementation(
IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(
elasticSearch7Client,
config.getResultWindowMaxSize()
));
this.registerServiceImplementation(
IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(
INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(
elasticSearch7Client,
config.getResultWindowMaxSize()
));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(
ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(
IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(elasticSearch7Client, config.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(
elasticSearch7Client,
config.getProfileTaskQueryMaxSize()
));
this.registerServiceImplementation(
IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(
elasticSearch7Client,
config.getProfileTaskQueryMaxSize()
));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEs7DAO(
elasticSearch7Client,
config.getProfileTaskQueryMaxSize()
));
}
@Override
......
......@@ -18,9 +18,19 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexNameConverter;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
......@@ -55,23 +65,21 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class ElasticSearch7Client extends ElasticSearchClient {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearch7Client.class);
public ElasticSearch7Client(final String clusterNodes, final String protocol, final String trustStorePath,
final String trustStorePass, final String namespace, final String user, final String password) {
super(clusterNodes, protocol, trustStorePath, trustStorePass, namespace, user, password);
public ElasticSearch7Client(final String clusterNodes,
final String protocol,
final String trustStorePath,
final String trustStorePass,
final String user,
final String password,
List<IndexNameConverter> indexNameConverters) {
super(
clusterNodes, protocol, trustStorePath, trustStorePass, user, password,
indexNameConverters
);
}
@Override
......@@ -91,7 +99,7 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public boolean createIndex(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings);
......@@ -138,11 +146,13 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public boolean createTemplate(String indexName, Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexName).patterns(Collections.singletonList(indexName + "-*"))
.alias(new Alias(indexName))
PutIndexTemplateRequest putIndexTemplateRequest = new PutIndexTemplateRequest(indexName).patterns(
Collections.singletonList(indexName + "-*"))
.alias(new Alias(
indexName))
.settings(settings)
.mapping(mapping);
......@@ -157,7 +167,8 @@ public class ElasticSearch7Client extends ElasticSearchClient {
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(indexName);
AcknowledgedResponse acknowledgedResponse = client.indices()
.deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT);
.deleteTemplate(
deleteIndexTemplateRequest, RequestOptions.DEFAULT);
return acknowledgedResponse.isAcknowledged();
}
......@@ -190,8 +201,9 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long seqNo,
long primaryTerm) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
long primaryTerm) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
indexName, id, source);
request.setIfSeqNo(seqNo);
request.setIfPrimaryTerm(primaryTerm);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
......@@ -199,7 +211,8 @@ public class ElasticSearch7Client extends ElasticSearchClient {
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request, RequestOptions.DEFAULT);
}
......@@ -221,7 +234,10 @@ public class ElasticSearch7Client extends ElasticSearchClient {
deleteByQueryRequest.setAbortOnVersionConflict(false);
deleteByQueryRequest.setQuery(QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket));
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
logger.debug("delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest, bulkByScrollResponse);
logger.debug(
"delete indexName: {}, by query request: {}, response: {}", indexName, deleteByQueryRequest,
bulkByScrollResponse
);
return HttpStatus.SC_OK;
}
......@@ -241,7 +257,10 @@ public class ElasticSearch7Client extends ElasticSearchClient {
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = createBulkListener();
return BulkProcessor.builder((bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener), listener)
return BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT,
bulkResponseActionListener
), listener)
.setBulkActions(bulkActions)
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
.setConcurrentRequests(concurrentRequests)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册