ElasticSearchStorageInstaller.java 5.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

19 20 21 22 23 24 25 26 27 28 29 30 31
package org.skywalking.apm.collector.storage.elasticsearch.define;

import java.io.IOException;
import java.util.List;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.storage.ColumnDefine;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.core.storage.TableDefine;
32
import org.skywalking.apm.collector.storage.elasticsearch.StorageElasticSearchConfig;
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author pengys5
 */
public class ElasticSearchStorageInstaller extends StorageInstaller {

    private final Logger logger = LoggerFactory.getLogger(ElasticSearchStorageInstaller.class);

    @Override protected void defineFilter(List<TableDefine> tableDefines) {
        int size = tableDefines.size();
        for (int i = size - 1; i >= 0; i--) {
            if (!(tableDefines.get(i) instanceof ElasticSearchTableDefine)) {
                tableDefines.remove(i);
            }
        }
    }

    @Override protected boolean createTable(Client client, TableDefine tableDefine) {
        ElasticSearchClient esClient = (ElasticSearchClient)client;
        ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine;
        // mapping
        XContentBuilder mappingBuilder = null;
57 58

        Settings settings = createSettingBuilder(esTableDefine);
59 60 61 62 63 64 65 66 67 68 69 70
        try {
            mappingBuilder = createMappingBuilder(esTableDefine);
            logger.info("mapping builder str: {}", mappingBuilder.string());
        } catch (Exception e) {
            logger.error("create {} index mapping builder error", esTableDefine.getName());
        }

        boolean isAcknowledged = esClient.createIndex(esTableDefine.getName(), esTableDefine.type(), settings, mappingBuilder);
        logger.info("create {} index with type of {} finished, isAcknowledged: {}", esTableDefine.getName(), esTableDefine.type(), isAcknowledged);
        return isAcknowledged;
    }

71 72
    private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) {
        return Settings.builder()
73 74
            .put("index.number_of_shards", StorageElasticSearchConfig.INDEX_SHARDS_NUMBER)
            .put("index.number_of_replicas", StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER)
P
pengys5 已提交
75 76 77 78 79 80
            .put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s")

            .put("analysis.analyzer.collector_analyzer.tokenizer", "collector_tokenizer")
            .put("analysis.tokenizer.collector_tokenizer.type", "standard")
            .put("analysis.tokenizer.collector_tokenizer.max_token_length", 5)
            .build();
81 82 83 84 85 86 87 88 89
    }

    private XContentBuilder createMappingBuilder(ElasticSearchTableDefine tableDefine) throws IOException {
        XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
            .startObject()
            .startObject("properties");

        for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
            ElasticSearchColumnDefine elasticSearchColumnDefine = (ElasticSearchColumnDefine)columnDefine;
90 91 92 93 94 95 96 97 98 99 100 101 102

            if (ElasticSearchColumnDefine.Type.Text.name().toLowerCase().equals(elasticSearchColumnDefine.getType().toLowerCase())) {
                mappingBuilder
                    .startObject(elasticSearchColumnDefine.getName())
                    .field("type", elasticSearchColumnDefine.getType().toLowerCase())
                    .field("fielddata", true)
                    .endObject();
            } else {
                mappingBuilder
                    .startObject(elasticSearchColumnDefine.getName())
                    .field("type", elasticSearchColumnDefine.getType().toLowerCase())
                    .endObject();
            }
103 104 105 106 107
        }

        mappingBuilder
            .endObject()
            .endObject();
P
pengys5 已提交
108
        logger.debug("create elasticsearch index: {}", mappingBuilder.string());
109 110 111
        return mappingBuilder;
    }

112
    @Override protected boolean deleteTable(Client client, TableDefine tableDefine) {
113 114 115 116 117 118 119 120 121 122 123 124 125 126
        ElasticSearchClient esClient = (ElasticSearchClient)client;
        try {
            return esClient.deleteIndex(tableDefine.getName());
        } catch (IndexNotFoundException e) {
            logger.info("{} index not found", tableDefine.getName());
        }
        return false;
    }

    @Override protected boolean isExists(Client client, TableDefine tableDefine) {
        ElasticSearchClient esClient = (ElasticSearchClient)client;
        return esClient.isExistsIndex(tableDefine.getName());
    }
}