提交 deb2d900 编写于 作者: Z zhangwei 提交者: wu-sheng

fix deleteIndex repeat append namespace (#3017)

* fix deleteIndex repeat append namespace

* fix deleteIndex repeat append namespace

* fix checkStyle

* fix

* fix test assert

* update test method name

* fix test

* fix

* del elastic.search.namespace

* fix check style

* update ITElasticSearchNamespaceClient to ITElasticSearchClientOfNamespace

* move undoFormatIndexName to ITElasticSearchClien

* move #getIndex to ITElasticSearchClient

* add deleteByModelName and deleteByIndexName

* add deleteByModelName and deleteByIndexName

* revert ITElasticSearchClient#getIndex

* Use whitebox get from powermock to get this.

* undo namespacePrefix
上级 8b2cfa2c
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.*;
import java.io.*;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.*;
import org.apache.http.auth.*;
......@@ -121,32 +122,43 @@ public class ElasticSearchClient implements Client {
public List<String> retrievalIndexByAliases(String aliases) throws IOException {
aliases = formatIndexName(aliases);
Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/_alias/" + aliases);
List<String> indexes = new ArrayList<>();
if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
Gson gson = new Gson();
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
indexes.addAll(responseJson.keySet());
return new ArrayList<>(responseJson.keySet());
}
return indexes;
return Collections.EMPTY_LIST;
}
public JsonObject getIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
GetIndexRequest request = new GetIndexRequest();
request.indices(indexName);
Response response = client.getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/" + indexName);
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
Gson gson = new Gson();
return gson.fromJson(reader, JsonObject.class);
/**
* If your indexName is retrieved from elasticsearch through {@link #retrievalIndexByAliases(String)} or some other method and it already contains namespace.
* Then you should delete the index by this method, this method will no longer concatenate namespace.
*
* https://github.com/apache/skywalking/pull/3017
*
*/
public boolean deleteByIndexName(String indexName) throws IOException {
return deleteIndex(indexName, false);
}
public boolean deleteIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
/**
* If your indexName is obtained from metadata or configuration and without namespace.
* Then you should delete the index by this method, this method automatically concatenates namespace.
*
* https://github.com/apache/skywalking/pull/3017
*
*/
public boolean deleteByModelName(String modelName) throws IOException {
return deleteIndex(modelName, true);
}
private boolean deleteIndex(String indexName, boolean formatIndexName) throws IOException {
if (formatIndexName) {
indexName = formatIndexName(indexName);
}
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
DeleteIndexResponse response;
response = client.indices().delete(request);
......
......@@ -18,11 +18,17 @@
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -31,11 +37,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
......@@ -48,10 +57,20 @@ public class ITElasticSearchClient {
private ElasticSearchClient client;
private final String namespace;
public ITElasticSearchClient() {
namespace = "";
}
protected ITElasticSearchClient(String namespace) {
this.namespace = namespace;
}
@Before
public void before() throws IOException {
final String esAddress = System.getProperty("elastic.search.address");
client = new ElasticSearchClient(esAddress, "", "test", "test");
client = new ElasticSearchClient(esAddress, namespace, "test", "test");
client.connect();
}
......@@ -82,7 +101,7 @@ public class ITElasticSearchClient {
client.createIndex(indexName, settings, doc);
Assert.assertTrue(client.isExistsIndex(indexName));
JsonObject index = client.getIndex(indexName);
JsonObject index = getIndex(indexName);
logger.info(index.toString());
Assert.assertEquals(2, index.getAsJsonObject(indexName).getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
......@@ -90,7 +109,7 @@ public class ITElasticSearchClient {
Assert.assertEquals("text", index.getAsJsonObject(indexName).getAsJsonObject("mappings").getAsJsonObject("type").getAsJsonObject("properties").getAsJsonObject("column1").get("type").getAsString());
Assert.assertTrue(client.deleteIndex(indexName));
Assert.assertTrue(client.deleteByModelName(indexName));
}
@Test
......@@ -157,8 +176,9 @@ public class ITElasticSearchClient {
.endObject();
client.forceInsert(indexName + "-2019", "testid", builder);
JsonObject index = client.getIndex(indexName + "-2019");
JsonObject index = getIndex(indexName + "-2019");
logger.info(index.toString());
Assert.assertEquals(1, index.getAsJsonObject(indexName + "-2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_shards").getAsInt());
Assert.assertEquals(0, index.getAsJsonObject(indexName + "-2019").getAsJsonObject("settings").getAsJsonObject("index").get("number_of_replicas").getAsInt());
......@@ -183,4 +203,67 @@ public class ITElasticSearchClient {
bulkProcessor.flush();
bulkProcessor.awaitClose(2, TimeUnit.SECONDS);
}
@Test
public void timeSeriesOperate() throws IOException {
String indexName = "test_time_series_operate";
String timeSeriesIndexName = indexName + "-2019";
JsonObject mapping = new JsonObject();
mapping.add("type", new JsonObject());
JsonObject doc = mapping.getAsJsonObject("type");
JsonObject properties = new JsonObject();
doc.add("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "text");
properties.add("name", column);
client.createTemplate(indexName, new JsonObject(), mapping);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("name", "pengys")
.endObject();
client.forceInsert(timeSeriesIndexName, "testid", builder);
List<String> indexes = client.retrievalIndexByAliases(indexName);
Assert.assertEquals(1, indexes.size());
String index = indexes.get(0);
Assert.assertTrue(client.deleteByIndexName(index));
Assert.assertFalse(client.isExistsIndex(timeSeriesIndexName));
}
private JsonObject getIndex(String indexName) throws IOException {
indexName = client.formatIndexName(indexName);
GetIndexRequest request = new GetIndexRequest();
request.indices(indexName);
Response response = getRestHighLevelClient().getLowLevelClient().performRequest(HttpGet.METHOD_NAME, "/" + indexName);
InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
Gson gson = new Gson();
return undoFormatIndexName(gson.fromJson(reader, JsonObject.class));
}
private RestHighLevelClient getRestHighLevelClient() {
return (RestHighLevelClient) Whitebox.getInternalState(client, "client");
}
private JsonObject undoFormatIndexName(JsonObject index) {
if (StringUtils.isNotEmpty(namespace) && index != null && index.size() > 0) {
logger.info("UndoFormatIndexName before " + index.toString());
String namespacePrefix = namespace + "_";
index.entrySet().forEach(entry -> {
String oldIndexName = entry.getKey();
if (oldIndexName.startsWith(namespacePrefix)) {
index.add(oldIndexName.substring(namespacePrefix.length()), entry.getValue());
index.remove(oldIndexName);
} else {
throw new RuntimeException("The indexName must contain the " + namespace + " prefix, but it is " + entry.getKey());
}
});
logger.info("UndoFormatIndexName after " + index.toString());
}
return index;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client.elasticsearch;
/**
* @author: zhangwei
*/
public class ITElasticSearchClientOfNamespace extends ITElasticSearchClient {
public ITElasticSearchClientOfNamespace() {
super("test");
}
}
......@@ -69,7 +69,7 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
}
for (String prepareDeleteIndex : prepareDeleteIndexes) {
client.deleteIndex(prepareDeleteIndex);
client.deleteByIndexName(prepareDeleteIndex);
}
} else {
int statusCode = client.delete(model.getName(), timeBucketColumnName, timeBefore);
......
......@@ -64,7 +64,7 @@ public class RegisterLockInstaller {
}
private void deleteIndex() throws IOException {
client.deleteIndex(RegisterLockIndex.NAME);
client.deleteByModelName(RegisterLockIndex.NAME);
}
private void createIndex() throws IOException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册