StorageModuleEsProvider.java 16.4 KB
Newer Older
1
/*
wu-sheng's avatar
wu-sheng 已提交
2 3 4 5 6 7
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
8 9 10 11 12 13 14 15 16 17 18
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

19
package org.apache.skywalking.apm.collector.storage.es;
20 21

import java.util.Properties;
22
import java.util.UUID;
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.storage.StorageException;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.apache.skywalking.apm.collector.storage.dao.IAlertingListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationComponentPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationComponentUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationMappingPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationMappingUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationReferenceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.ICpuMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ICpuMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
P
peng-yongsheng 已提交
52
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
53
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
P
peng-yongsheng 已提交
54
import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO;
55 56 57
import org.apache.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
P
peng-yongsheng 已提交
58
import org.apache.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO;
59 60 61
import org.apache.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IMemoryPoolMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
P
peng-yongsheng 已提交
62
import org.apache.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
63 64 65 66 67 68 69 70 71 72 73 74 75 76
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
import org.apache.skywalking.apm.collector.storage.es.dao.AlertingListEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationComponentEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationComponentEsUIDAO;
P
peng-yongsheng 已提交
77
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationEsCacheDAO;
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationEsRegisterDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationMappingEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationMappingEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationReferenceMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationReferenceMetricEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.CpuMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.CpuMetricEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.GCMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.GCMetricEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.GlobalTraceEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsCacheDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsRegisterDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsUIDAO;
P
peng-yongsheng 已提交
96
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceReferenceMetricEsPersistenceDAO;
97 98 99 100 101
import org.apache.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.SegmentCostEsPersistenceDAO;
P
peng-yongsheng 已提交
102
import org.apache.skywalking.apm.collector.storage.es.dao.SegmentCostEsUIDAO;
103 104 105 106 107 108 109 110 111
import org.apache.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.SegmentEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceEntryEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceNameEsRegisterDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceReferenceMetricEsPersistenceDAO;
112 113
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
114 115 116 117 118 119

/**
 * @author peng-yongsheng
 */
public class StorageModuleEsProvider extends ModuleProvider {

120 121
    private final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);

122
    public static final String NAME = "elasticsearch";
123 124 125 126 127
    private static final String CLUSTER_NAME = "cluster_name";
    private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
    private static final String CLUSTER_NODES = "cluster_nodes";
    private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
    private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
128
    private static final String TIME_TO_LIVE_OF_DATA = "ttl";
129 130

    private ElasticSearchClient elasticSearchClient;
131
    private DataTTLKeeperTimer deleteTimer;
132

133
    @Override public String name() {
134
        return NAME;
135 136 137 138 139 140 141
    }

    @Override public Class<? extends Module> module() {
        return StorageModule.class;
    }

    @Override public void prepare(Properties config) throws ServiceNotProvidedException {
142 143 144 145
        String clusterName = config.getProperty(CLUSTER_NAME);
        Boolean clusterTransportSniffer = (Boolean)config.get(CLUSTER_TRANSPORT_SNIFFER);
        String clusterNodes = config.getProperty(CLUSTER_NODES);
        elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
P
peng-yongsheng 已提交
146

P
peng-yongsheng 已提交
147 148 149 150 151
        this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
        registerCacheDAO();
        registerRegisterDAO();
        registerPersistenceDAO();
        registerUiDAO();
P
peng-yongsheng 已提交
152
        registerAlertingDAO();
153 154 155
    }

    @Override public void start(Properties config) throws ServiceNotProvidedException {
156 157 158 159 160 161 162
        Integer indexShardsNumber = (Integer)config.get(INDEX_SHARDS_NUMBER);
        Integer indexReplicasNumber = (Integer)config.get(INDEX_REPLICAS_NUMBER);
        try {
            elasticSearchClient.initialize();

            ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber);
            installer.install(elasticSearchClient);
P
peng-yongsheng 已提交
163
        } catch (ClientException | StorageException e) {
164 165
            logger.error(e.getMessage(), e);
        }
166 167 168 169 170 171 172 173 174

        String uuId = UUID.randomUUID().toString();
        ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
        moduleRegisterService.register(StorageModule.NAME, this.name(), new StorageModuleEsRegistration(uuId, 0));

        StorageModuleEsNamingListener namingListener = new StorageModuleEsNamingListener();
        ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
        moduleListenerService.addListener(namingListener);

175 176
        Integer beforeDay = (Integer)config.getOrDefault(TIME_TO_LIVE_OF_DATA, 3);
        deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, uuId + 0, beforeDay);
177 178 179
    }

    @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
180
        deleteTimer.start();
181 182 183
    }

    @Override public String[] requiredModules() {
184
        return new String[] {ClusterModule.NAME};
185
    }
P
peng-yongsheng 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205

    private void registerCacheDAO() throws ServiceNotProvidedException {
        this.registerServiceImplementation(IApplicationCacheDAO.class, new ApplicationEsCacheDAO(elasticSearchClient));
        this.registerServiceImplementation(IInstanceCacheDAO.class, new InstanceEsCacheDAO(elasticSearchClient));
        this.registerServiceImplementation(IServiceNameCacheDAO.class, new ServiceNameEsCacheDAO(elasticSearchClient));
    }

    private void registerRegisterDAO() throws ServiceNotProvidedException {
        this.registerServiceImplementation(IApplicationRegisterDAO.class, new ApplicationEsRegisterDAO(elasticSearchClient));
        this.registerServiceImplementation(IInstanceRegisterDAO.class, new InstanceEsRegisterDAO(elasticSearchClient));
        this.registerServiceImplementation(IServiceNameRegisterDAO.class, new ServiceNameEsRegisterDAO(elasticSearchClient));
    }

    private void registerPersistenceDAO() throws ServiceNotProvidedException {
        this.registerServiceImplementation(ICpuMetricPersistenceDAO.class, new CpuMetricEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(IGCMetricPersistenceDAO.class, new GCMetricEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(IMemoryMetricPersistenceDAO.class, new MemoryMetricEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(IMemoryPoolMetricPersistenceDAO.class, new MemoryPoolMetricEsPersistenceDAO(elasticSearchClient));

        this.registerServiceImplementation(IGlobalTracePersistenceDAO.class, new GlobalTraceEsPersistenceDAO(elasticSearchClient));
P
peng-yongsheng 已提交
206 207
        this.registerServiceImplementation(IApplicationComponentPersistenceDAO.class, new ApplicationComponentEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(IApplicationMappingPersistenceDAO.class, new ApplicationMappingEsPersistenceDAO(elasticSearchClient));
208
        this.registerServiceImplementation(IApplicationMetricPersistenceDAO.class, new ApplicationMetricEsPersistenceDAO(elasticSearchClient));
209
        this.registerServiceImplementation(IApplicationReferenceMetricPersistenceDAO.class, new ApplicationReferenceMetricEsPersistenceDAO(elasticSearchClient));
P
peng-yongsheng 已提交
210 211 212
        this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryEsPersistenceDAO(elasticSearchClient));
213 214
        this.registerServiceImplementation(IServiceMetricPersistenceDAO.class, new ServiceMetricEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceMetricEsPersistenceDAO(elasticSearchClient));
P
peng-yongsheng 已提交
215

P
peng-yongsheng 已提交
216 217
        this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricEsPersistenceDAO(elasticSearchClient));
        this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricEsPersistenceDAO(elasticSearchClient));
P
peng-yongsheng 已提交
218 219 220 221 222 223 224 225 226
        this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient));
    }

    private void registerUiDAO() throws ServiceNotProvidedException {
        this.registerServiceImplementation(IInstanceUIDAO.class, new InstanceEsUIDAO(elasticSearchClient));

        this.registerServiceImplementation(ICpuMetricUIDAO.class, new CpuMetricEsUIDAO(elasticSearchClient));
        this.registerServiceImplementation(IGCMetricUIDAO.class, new GCMetricEsUIDAO(elasticSearchClient));
        this.registerServiceImplementation(IMemoryMetricUIDAO.class, new MemoryMetricEsUIDAO(elasticSearchClient));
227
        this.registerServiceImplementation(IMemoryPoolMetricUIDAO.class, new MemoryPoolMetricEsUIDAO(elasticSearchClient));
P
peng-yongsheng 已提交
228 229

        this.registerServiceImplementation(IGlobalTraceUIDAO.class, new GlobalTraceEsUIDAO(elasticSearchClient));
230
        this.registerServiceImplementation(IInstanceMetricUIDAO.class, new InstanceMetricEsUIDAO(elasticSearchClient));
231 232 233
        this.registerServiceImplementation(IApplicationComponentUIDAO.class, new ApplicationComponentEsUIDAO(elasticSearchClient));
        this.registerServiceImplementation(IApplicationMappingUIDAO.class, new ApplicationMappingEsUIDAO(elasticSearchClient));
        this.registerServiceImplementation(IApplicationReferenceMetricUIDAO.class, new ApplicationReferenceMetricEsUIDAO(elasticSearchClient));
P
peng-yongsheng 已提交
234 235 236
        this.registerServiceImplementation(ISegmentCostUIDAO.class, new SegmentCostEsUIDAO(elasticSearchClient));
        this.registerServiceImplementation(ISegmentUIDAO.class, new SegmentEsUIDAO(elasticSearchClient));
        this.registerServiceImplementation(IServiceEntryUIDAO.class, new ServiceEntryEsUIDAO(elasticSearchClient));
237
        this.registerServiceImplementation(IServiceReferenceUIDAO.class, new ServiceReferenceEsUIDAO(elasticSearchClient));
P
peng-yongsheng 已提交
238
    }
P
peng-yongsheng 已提交
239 240 241 242

    private void registerAlertingDAO() throws ServiceNotProvidedException {
        this.registerServiceImplementation(IAlertingListPersistenceDAO.class, new AlertingListEsPersistenceDAO(elasticSearchClient));
    }
wu-sheng's avatar
wu-sheng 已提交
243
}