提交 73e8853f 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Provide elasticsearch template operation method. (#2474)

* Provide elasticsearch template operation method.

* Fixed compile failure issue.
上级 a0ef72cf
......@@ -48,9 +48,5 @@ public abstract class CPMIndicator extends Indicator implements LongValueHolder
@Override public void calculate() {
this.value = total / getDurationInMinute();
}
@Override public long getValue() {
return value;
}
}
......@@ -45,8 +45,4 @@ public abstract class CountIndicator extends Indicator implements LongValueHolde
@Override public void calculate() {
}
@Override public long getValue() {
return value;
}
}
......@@ -51,8 +51,4 @@ public abstract class DoubleAvgIndicator extends Indicator implements DoubleValu
@Override public final void calculate() {
this.value = this.summation / this.count;
}
@Override public double getValue() {
return value;
}
}
......@@ -41,14 +41,6 @@ public class IndicatorMetaInfo {
this.id = id;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override public String toString() {
return "IndicatorMetaInfo{" +
"indicatorName='" + indicatorName + '\'' +
......
......@@ -51,8 +51,4 @@ public abstract class LongAvgIndicator extends Indicator implements LongValueHol
@Override public final void calculate() {
this.value = this.summation / this.count;
}
@Override public long getValue() {
return value;
}
}
......@@ -46,8 +46,4 @@ public abstract class MaxDoubleIndicator extends Indicator implements DoubleValu
@Override public void calculate() {
}
@Override public double getValue() {
return value;
}
}
......@@ -18,11 +18,8 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.Entrance;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorFunction;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.SourceFrom;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
......@@ -49,8 +46,4 @@ public abstract class MaxLongIndicator extends Indicator implements LongValueHol
@Override public void calculate() {
}
@Override public long getValue() {
return value;
}
}
......@@ -45,8 +45,4 @@ public abstract class SumIndicator extends Indicator implements LongValueHolder
@Override public void calculate() {
}
@Override public long getValue() {
return value;
}
}
......@@ -18,12 +18,14 @@
package org.apache.skywalking.oap.server.library.client;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public interface Client {
void connect() throws ClientException;
void shutdown();
void connect() throws IOException;
void shutdown() throws IOException;
}
......@@ -18,53 +18,32 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.*;
import java.io.*;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.*;
import org.apache.http.auth.*;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.*;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.skywalking.oap.server.library.client.Client;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.create.*;
import org.elasticsearch.action.admin.indices.delete.*;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.client.*;
import org.elasticsearch.common.unit.*;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -73,7 +52,7 @@ public class ElasticSearchClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
private static final String TYPE = "type";
public static final String TYPE = "type";
private final String clusterNodes;
private final String namespace;
private final String user;
......@@ -87,32 +66,23 @@ public class ElasticSearchClient implements Client {
this.password = password;
}
@Override public void connect() {
@Override public void connect() throws IOException {
List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
RestClientBuilder builder;
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
} else {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
}
client = new RestHighLevelClient(builder);
client.ping();
}
@Override public void shutdown() {
try {
client.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
@Override public void shutdown() throws IOException {
client.close();
}
private List<HttpHost> parseClusterNodes(String nodes) {
......@@ -128,17 +98,26 @@ public class ElasticSearchClient implements Client {
return httpHosts;
}
public boolean createIndex(String indexName, Settings settings,
XContentBuilder mappingBuilder) throws IOException {
public boolean createIndex(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings);
request.mapping(TYPE, mappingBuilder);
request.settings(settings.toString(), XContentType.JSON);
request.mapping(TYPE, mapping.toString(), XContentType.JSON);
CreateIndexResponse response = client.indices().create(request);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public JsonObject getIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
GetIndexRequest request = new GetIndexRequest();
request.indices(indexName);
Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/" + indexName);
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
Gson gson = new Gson();
return gson.fromJson(reader, JsonObject.class);
}
public boolean deleteIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
......@@ -155,6 +134,45 @@ public class ElasticSearchClient implements Client {
return client.indices().exists(request);
}
public boolean isExistsTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
Response response = client.getLowLevelClient().performRequest(HttpHead.METHOD_NAME, "/_template/" + indexName);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200) {
return true;
} else if (statusCode == 404) {
return false;
} else {
throw new IOException("The response status code of template exists request should be 200 or 404, but it is " + statusCode);
}
}
public boolean createTemplate(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
indexName = formatIndexName(indexName);
JsonArray patterns = new JsonArray();
patterns.add(indexName + "_*");
JsonObject template = new JsonObject();
template.add("index_patterns", patterns);
template.add("settings", settings);
template.add("mappings", mapping);
HttpEntity entity = new NStringEntity(template.toString(), ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient().performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
return response.getStatusLine().getStatusCode() == 200;
}
public boolean deleteTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
Response response = client.getLowLevelClient().performRequest(HttpDelete.METHOD_NAME, "/_template/" + indexName);
return response.getStatusLine().getStatusCode() == 200;
}
public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
......@@ -218,7 +236,7 @@ public class ElasticSearchClient implements Client {
" }" +
"}";
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
Response response = client.getLowLevelClient().performRequest(HttpPost.METHOD_NAME, "/" + indexName + "/_delete_by_query", params, entity);
logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString);
return response.getStatusLine().getStatusCode();
}
......@@ -230,23 +248,26 @@ public class ElasticSearchClient implements Client {
return indexName;
}
public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval,
int concurrentRequests) {
public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.info("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
logger.error("Failed to execute bulk", failure);
}
};
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import java.io.IOException;
import org.apache.skywalking.oap.server.library.client.ClientException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.junit.Assert;
/**
* @author peng-yongsheng
*/
public class ElasticSearchClientTestCase {
public static void main(String[] args) throws IOException, ClientException {
Settings settings = Settings.builder()
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
.build();
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject()
.startObject("_all")
.field("enabled", false)
.endObject()
.startObject("properties")
.startObject("column1")
.field("type", "text")
.endObject()
.endObject();
builder.endObject();
ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null, null, null);
client.connect();
String indexName = "test";
client.createIndex(indexName, settings, builder);
Assert.assertTrue(client.isExistsIndex(indexName));
client.deleteIndex(indexName);
Assert.assertFalse(client.isExistsIndex(indexName));
client.shutdown();
}
}
......@@ -18,29 +18,167 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.elasticsearch.action.get.GetResponse;
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ITElasticSearchClient {
@Test
public void test() throws IOException {
ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null, null, null);
private static final Logger logger = LoggerFactory.getLogger(ITElasticSearchClient.class);
private ElasticSearchClient client;
@Before
public void before() throws IOException {
client = new ElasticSearchClient("localhost:9200", "", "test", "test");
client.connect();
}
@After
public void after() throws IOException {
client.shutdown();
}
@Test
public void indexOperate() throws IOException {
JsonObject settings = new JsonObject();
settings.addProperty("number_of_shards", 2);
settings.addProperty("number_of_replicas", 2);
JsonObject mapping = new JsonObject();
mapping.add("_doc", new JsonObject());
JsonObject doc = mapping.getAsJsonObject("_doc");
JsonObject properties = new JsonObject();
doc.add("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "text");
properties.add("column1", column);
String indexName = "test_index_operate";
client.createIndex(indexName, settings, doc);
Assert.assertTrue(client.isExistsIndex(indexName));
JsonObject index = client.getIndex(indexName);
logger.info(index.toString());
Assert.assertEquals(2, index.getAsJsonObject(indexName).getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
Assert.assertEquals(2, index.getAsJsonObject(indexName).getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
Assert.assertEquals("text", index.getAsJsonObject(indexName).getAsJsonObject("mappings").getAsJsonObject("type").getAsJsonObject("properties").getAsJsonObject("column1").get("type").getAsString());
Assert.assertTrue(client.deleteIndex(indexName));
}
@Test
public void documentOperate() throws IOException {
String id = String.valueOf(System.currentTimeMillis());
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("post_date", "2009-11-15T14:12:12")
.field("message", "trying out Elasticsearch")
.endObject();
String indexName = "test_document_operate";
client.forceInsert(indexName, id, builder);
GetResponse response = client.get(indexName, id);
Assert.assertEquals("kimchy", response.getSource().get("user"));
Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "pengys")
.endObject();
client.forceUpdate(indexName, id, builder);
response = client.get(indexName, id);
Assert.assertEquals("pengys", response.getSource().get("user"));
Assert.assertEquals("trying out Elasticsearch", response.getSource().get("message"));
List<String> ids = new ArrayList<>();
ids.add(id);
MultiGetResponse responses = client.multiGet(indexName, ids);
Assert.assertEquals(1, responses.getResponses().length);
Assert.assertEquals("pengys", responses.getResponses()[0].getResponse().getSource().get("user"));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("user", "pengys"));
SearchResponse searchResponse = client.search(indexName, sourceBuilder);
Assert.assertEquals("trying out Elasticsearch", searchResponse.getHits().getHits()[0].getSourceAsMap().get("message"));
}
@Test
public void templateOperate() throws IOException {
JsonObject settings = new JsonObject();
settings.addProperty("number_of_shards", 1);
settings.addProperty("number_of_replicas", 0);
settings.addProperty("index.refresh_interval", "3s");
settings.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
JsonObject mapping = new JsonObject();
mapping.add("type", new JsonObject());
JsonObject doc = mapping.getAsJsonObject("type");
JsonObject properties = new JsonObject();
doc.add("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "text");
properties.add("name", column);
String indexName = "template_operate";
client.createTemplate(indexName, settings, mapping);
Assert.assertTrue(client.isExistsTemplate(indexName));
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("name", "pengys")
.endObject();
client.forceInsert(indexName + "_2019", "testid", builder);
JsonObject index = client.getIndex(indexName + "_2019");
logger.info(index.toString());
Assert.assertEquals(1, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
Assert.assertEquals(0, index.getAsJsonObject(indexName + "_2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
client.deleteTemplate(indexName);
Assert.assertFalse(client.isExistsTemplate(indexName));
}
@Test
public void bulk() throws InterruptedException {
BulkProcessor bulkProcessor = client.createBulkProcessor(2000, 200, 10, 2);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("key", "value");
builder.endObject();
client.forceInsert("test_index", "201904091521", builder);
Map<String, String> source = new HashMap<>();
source.put("column1", "value1");
source.put("column2", "value2");
GetResponse response = client.get("test_index", "201904091521");
for (int i = 0; i < 100; i++) {
IndexRequest indexRequest = new IndexRequest("bulk_insert_test", "type", String.valueOf(i));
indexRequest.source(source);
bulkProcessor.add(indexRequest);
}
Assert.assertTrue(response.getSource().containsKey("key"));
Assert.assertEquals("value", response.getSource().get("key"));
bulkProcessor.flush();
bulkProcessor.awaitClose(2, TimeUnit.SECONDS);
}
}
......@@ -17,14 +17,14 @@
~
-->
<Configuration status="DEBUG">
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import java.io.IOException;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.*;
......@@ -98,7 +99,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
lockInstaller.install();
} catch (StorageException e) {
} catch (StorageException | IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
......
......@@ -18,21 +18,14 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -43,13 +36,13 @@ public class StorageEsInstaller extends ModelInstaller {
private final int indexShardsNumber;
private final int indexReplicasNumber;
private final ColumnTypeEsMapping mapping;
private final ColumnTypeEsMapping columnTypeEsMapping;
public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) {
super(moduleManager);
this.indexShardsNumber = indexShardsNumber;
this.indexReplicasNumber = indexReplicasNumber;
this.mapping = new ColumnTypeEsMapping();
this.columnTypeEsMapping = new ColumnTypeEsMapping();
}
@Override protected boolean isExists(Client client, Model tableDefine) throws StorageException {
......@@ -80,20 +73,13 @@ public class StorageEsInstaller extends ModelInstaller {
@Override protected void createTable(Client client, Model tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
// mapping
XContentBuilder mappingBuilder = null;
Settings settings = createSettingBuilder();
try {
mappingBuilder = createMappingBuilder(tableDefine);
logger.info("index {}'s mapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), Strings.toString(mappingBuilder.prettyPrint()));
} catch (Exception e) {
logger.error("create {} index mapping builder error, error message: {}", esClient.formatIndexName(tableDefine.getName()), e.getMessage());
}
JsonObject settings = createSetting();
JsonObject mapping = createMapping(tableDefine);
logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), mapping.toString());
boolean isAcknowledged;
try {
isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mappingBuilder);
isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mapping);
} catch (IOException e) {
throw new StorageException(e.getMessage());
}
......@@ -104,50 +90,46 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
private Settings createSettingBuilder() {
return Settings.builder()
.put("index.number_of_shards", indexShardsNumber)
.put("index.number_of_replicas", indexReplicasNumber)
.put("index.refresh_interval", "3s")
.put("analysis.analyzer.oap_analyzer.type", "stop")
.build();
private JsonObject createSetting() {
JsonObject setting = new JsonObject();
setting.addProperty("index.number_of_shards", indexShardsNumber);
setting.addProperty("index.number_of_replicas", indexReplicasNumber);
setting.addProperty("index.refresh_interval", "3s");
setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
return setting;
}
private XContentBuilder createMappingBuilder(Model tableDefine) throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("_all")
.field("enabled", false)
.endObject()
.startObject("properties");
private JsonObject createMapping(Model tableDefine) {
JsonObject mapping = new JsonObject();
mapping.add(ElasticSearchClient.TYPE, new JsonObject());
JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
JsonObject properties = new JsonObject();
type.add("properties", properties);
for (ModelColumn columnDefine : tableDefine.getColumns()) {
if (columnDefine.isMatchQuery()) {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
mappingBuilder
.startObject(columnDefine.getColumnName().getName())
.field("type", mapping.transform(columnDefine.getType()))
.field("copy_to", matchCName)
.endObject()
.startObject(matchCName)
.field("type", "text")
.field("analyzer", "oap_analyzer")
.endObject();
JsonObject originalColumn = new JsonObject();
originalColumn.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
originalColumn.addProperty("copy_to", matchCName);
properties.add(columnDefine.getColumnName().getName(), originalColumn);
JsonObject matchColumn = new JsonObject();
matchColumn.addProperty("type", "text");
matchColumn.addProperty("analyzer", "oap_analyzer");
properties.add(columnDefine.getColumnName().getName(), matchColumn);
} else {
mappingBuilder
.startObject(columnDefine.getColumnName().getName())
.field("type", mapping.transform(columnDefine.getType()))
.endObject();
JsonObject column = new JsonObject();
column.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
properties.add(columnDefine.getColumnName().getName(), column);
}
}
mappingBuilder
.endObject()
.endObject();
logger.debug("create elasticsearch index: {}", mappingBuilder.prettyPrint());
logger.debug("create elasticsearch index: {}", mapping.toString());
return mappingBuilder;
return mapping;
}
}
......@@ -18,13 +18,13 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
......@@ -68,22 +68,24 @@ public class RegisterLockInstaller {
}
private void createIndex() throws IOException {
Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "1s")
.build();
XContentBuilder source = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(RegisterLockIndex.COLUMN_SEQUENCE)
.field("type", "integer")
.endObject()
.endObject()
.endObject();
client.createIndex(RegisterLockIndex.NAME, settings, source);
JsonObject settings = new JsonObject();
settings.addProperty("index.number_of_shards", 1);
settings.addProperty("index.number_of_replicas", 0);
settings.addProperty("index.refresh_interval", "1s");
JsonObject mapping = new JsonObject();
mapping.add(ElasticSearchClient.TYPE, new JsonObject());
JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
JsonObject properties = new JsonObject();
type.add("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "integer");
properties.add(RegisterLockIndex.COLUMN_SEQUENCE, column);
client.createIndex(RegisterLockIndex.NAME, settings, mapping);
}
private void putIfAbsent(int scopeId) throws IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册