提交 06165a03 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Feature/oap/storage (#1516)

* Storage and Persistence.

* Storage config.

* Fixed the CI failure.
上级 7785e42f
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.storage.base.dao;
import java.util.List;
......
......@@ -55,7 +55,7 @@
<h2.version>1.4.196</h2.version>
<shardingjdbc.version>2.0.3</shardingjdbc.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<elasticsearch.version>6.3.1</elasticsearch.version>
<elasticsearch.version>6.3.2</elasticsearch.version>
<joda-time.version>2.9.9</joda-time.version>
<kubernetes.version>2.0.0</kubernetes.version>
</properties>
......@@ -142,7 +142,7 @@
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
......
......@@ -48,7 +48,7 @@ public class CoreModuleProvider extends ModuleProvider {
super();
this.moduleConfig = new CoreModuleConfig();
this.indicatorMapper = new IndicatorMapper();
this.workerMapper = new WorkerMapper(getManager());
this.workerMapper = new WorkerMapper();
}
@Override public String name() {
......@@ -87,7 +87,7 @@ public class CoreModuleProvider extends ModuleProvider {
try {
indicatorMapper.load();
workerMapper.load();
workerMapper.load(getManager());
} catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
throw new ModuleStartException(e.getMessage(), e);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core;
/**
* @author wu-sheng
*/
public class UnexpectedException extends RuntimeException {
public UnexpectedException(String message) {
super(message);
}
}
......@@ -20,23 +20,17 @@ package org.apache.skywalking.oap.server.core.analysis.endpoint;
import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker<EndpointLatencyAvgIndicator> {
private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
private final EndpointLatencyAvgRemoteWorker remoter;
public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
super(moduleManager);
this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
}
@Override protected void onNext(EndpointLatencyAvgIndicator data) {
remoter.in(data);
@Override public Class nextWorkerClass() {
return EndpointLatencyAvgRemoteWorker.class;
}
}
......@@ -18,16 +18,33 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgIndicator extends AvgIndicator {
@Setter @Getter private int id;
private static final String NAME = "endpoint_latency_avg";
private static final String ID = "id";
private static final String SERVICE_ID = "service_id";
private static final String SERVICE_INSTANCE_ID = "service_instance_id";
@Setter @Getter @Column(columnName = ID) private int id;
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId;
@Override public String name() {
return NAME;
}
@Override public String id() {
return String.valueOf(id);
}
@Override public int hashCode() {
int result = 17;
......@@ -56,18 +73,49 @@ public class EndpointLatencyAvgIndicator extends AvgIndicator {
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getId());
remoteBuilder.setDataIntegers(1, getCount());
remoteBuilder.setDataIntegers(1, getServiceId());
remoteBuilder.setDataIntegers(2, getServiceInstanceId());
remoteBuilder.setDataIntegers(3, getCount());
remoteBuilder.setDataLongs(0, getTimeBucket());
remoteBuilder.setDataLongs(1, getSummation());
remoteBuilder.setDataLongs(2, getValue());
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setId(remoteData.getDataIntegers(0));
setCount(remoteData.getDataIntegers(1));
setServiceId(remoteData.getDataIntegers(1));
setServiceInstanceId(remoteData.getDataIntegers(2));
setCount(remoteData.getDataIntegers(3));
setTimeBucket(remoteData.getDataLongs(0));
setSummation(remoteData.getDataLongs(1));
setValue(remoteData.getDataLongs(2));
}
@Override public Map<String, Object> toMap() {
Map<String, Object> map = new HashMap<>();
map.put(ID, id);
map.put(SERVICE_ID, serviceId);
map.put(SERVICE_INSTANCE_ID, serviceInstanceId);
map.put(COUNT, getCount());
map.put(SUMMATION, getSummation());
map.put(VALUE, getValue());
map.put(TIME_BUCKET, getTimeBucket());
return map;
}
@Override public Indicator newOne(Map<String, Object> dbMap) {
EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator();
indicator.setId((Integer)dbMap.get(ID));
indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
indicator.setCount((Integer)dbMap.get(COUNT));
indicator.setSummation((Long)dbMap.get(SUMMATION));
indicator.setValue((Long)dbMap.get(VALUE));
indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
return indicator;
}
}
......@@ -29,4 +29,8 @@ public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker
public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override protected boolean needMergeDBData() {
return true;
}
}
......@@ -21,15 +21,21 @@ package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
@IndicatorType(selector = Selector.HashCode)
@IndicatorType(selector = Selector.HashCode, needMerge = true)
public abstract class AvgIndicator extends Indicator {
@Getter @Setter private long summation;
@Getter @Setter private int count;
protected static final String SUMMATION = "summation";
protected static final String COUNT = "count";
protected static final String VALUE = "value";
@Getter @Setter @Column(columnName = SUMMATION) private long summation;
@Getter @Setter @Column(columnName = COUNT) private int count;
@Getter @Setter @Column(columnName = VALUE) private long value;
@Entrance
public final void combine(@SourceFrom long summation, @ConstOne int count) {
......
......@@ -18,15 +18,27 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
import java.util.Map;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
public abstract class Indicator extends StreamData {
@Getter @Setter private long timeBucket;
protected static final String TIME_BUCKET = "time_bucket";
@Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
public abstract String id();
public abstract void combine(Indicator indicator);
public abstract String name();
public abstract Map<String, Object> toMap();
public abstract Indicator newOne(Map<String, Object> dbMap);
}
......@@ -28,4 +28,6 @@ import org.apache.skywalking.oap.server.core.remote.selector.Selector;
@Retention(RetentionPolicy.SOURCE)
public @interface IndicatorType {
Selector selector();
boolean needMerge();
}
......@@ -79,4 +79,8 @@ public class IndicatorMapper implements Service {
public Class<Indicator> findClassById(int id) {
return idKeyMapping.get(id);
}
public Collection<Class<Indicator>> indicatorClasses() {
return idKeyMapping.values();
}
}
......@@ -21,8 +21,10 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
......@@ -33,11 +35,14 @@ public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends
private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class);
private Worker worker;
private final ModuleManager moduleManager;
private final DataCarrier<INPUT> dataCarrier;
private final MergeDataCache<INPUT> mergeDataCache;
private int messageNum;
public AbstractAggregatorWorker(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
......@@ -78,7 +83,15 @@ public abstract class AbstractAggregatorWorker<INPUT extends Indicator> extends
mergeDataCache.finishReadingLast();
}
protected abstract void onNext(INPUT data);
private void onNext(INPUT data) {
if (worker == null) {
WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
worker = workerMapper.findInstanceByClass(nextWorkerClass());
}
worker.in(data);
}
public abstract Class nextWorkerClass();
private void aggregate(INPUT message) {
mergeDataCache.writing();
......
......@@ -18,18 +18,119 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
public abstract class AbstractPersistentWorker<INPUT extends Indicator> extends Worker<INPUT> {
private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentWorker.class);
private final MergeDataCache<INPUT> mergeDataCache;
private final IBatchDAO batchDAO;
private final IPersistenceDAO<?, ?, INPUT> persistenceDAO;
private final int blockBatchPersistenceSize = 1000;
public AbstractPersistentWorker(ModuleManager moduleManager) {
this.mergeDataCache = new MergeDataCache<>();
this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
this.persistenceDAO = moduleManager.find(StorageModule.NAME).getService(IPersistenceDAO.class);
}
public final Window<MergeDataCollection<INPUT>> getCache() {
return mergeDataCache;
}
@Override public final void in(INPUT input) {
if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
try {
if (getCache().trySwitchPointer()) {
getCache().switchPointer();
List<?> collection = buildBatchCollection();
batchDAO.batchPersistence(collection);
}
} finally {
getCache().trySwitchPointerFinally();
}
}
cacheData(input);
}
public final List<?> buildBatchCollection() {
List<?> batchCollection = new LinkedList<>();
try {
while (getCache().getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
if (getCache().getLast().collection() != null) {
batchCollection = prepareBatch(getCache().getLast());
}
} finally {
getCache().finishReadingLast();
}
return batchCollection;
}
private List<Object> prepareBatch(MergeDataCollection<INPUT> collection) {
List<Object> batchCollection = new LinkedList<>();
collection.collection().forEach((id, data) -> {
if (needMergeDBData()) {
INPUT dbData = null;
try {
dbData = persistenceDAO.get(data);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
if (nonNull(dbData)) {
dbData.combine(data);
try {
batchCollection.add(persistenceDAO.prepareBatchUpdate(dbData));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} else {
try {
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
} else {
try {
batchCollection.add(persistenceDAO.prepareBatchInsert(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
});
return batchCollection;
}
private void cacheData(INPUT input) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(input)) {
mergeDataCache.get(input).combine(input);
} else {
mergeDataCache.put(input);
}
mergeDataCache.finishWriting();
}
protected abstract boolean needMergeDBData();
}
......@@ -34,14 +34,12 @@ public class WorkerMapper implements Service {
private static final Logger logger = LoggerFactory.getLogger(WorkerMapper.class);
private int id = 0;
private final ModuleManager moduleManager;
private final Map<Class<Worker>, Integer> classKeyMapping;
private final Map<Integer, Class<Worker>> idKeyMapping;
private final Map<Class<Worker>, Worker> classKeyInstanceMapping;
private final Map<Integer, Worker> idKeyInstanceMapping;
public WorkerMapper(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
public WorkerMapper() {
this.classKeyMapping = new HashMap<>();
this.idKeyMapping = new HashMap<>();
this.classKeyInstanceMapping = new HashMap<>();
......@@ -49,7 +47,7 @@ public class WorkerMapper implements Service {
}
@SuppressWarnings(value = "unchecked")
public void load() throws WorkerDefineLoadException {
public void load(ModuleManager moduleManager) throws WorkerDefineLoadException {
try {
List<String> workerClasses = new LinkedList<>();
......
......@@ -20,10 +20,12 @@ package org.apache.skywalking.oap.server.core.receiver;
import lombok.*;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.receiver.annotation.SourceType;
/**
* @author peng-yongsheng
*/
@SourceType
public class Endpoint extends Source {
@Override public Scope scope() {
return Scope.Endpoint;
......@@ -31,7 +33,9 @@ public class Endpoint extends Source {
@Getter @Setter private int id;
@Getter @Setter private String name;
@Getter @Setter private int serviceId;
@Getter @Setter private String serviceName;
@Getter @Setter private int serviceInstanceId;
@Getter @Setter private String serviceInstanceName;
@Getter @Setter private int latency;
@Getter @Setter private boolean status;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.receiver.annotation;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.SOURCE)
public @interface SourceType {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.library.client.Client;
/**
* @author peng-yongsheng
*/
public abstract class AbstractDAO<C extends Client> implements DAO {
private final C client;
public AbstractDAO(C client) {
this.client = client;
}
public final C getClient() {
return client;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* @author peng-yongsheng
*/
public interface DAO extends Service {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.List;
/**
* @author peng-yongsheng
*/
public interface IBatchDAO extends DAO {
void batchPersistence(List<?> batchCollection);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
public interface IPersistenceDAO<INSERT, UPDATE, INPUT extends Indicator> extends DAO {
INPUT get(INPUT input) throws IOException;
INSERT prepareBatchInsert(INPUT input) throws IOException;
UPDATE prepareBatchUpdate(INPUT input) throws IOException;
void deleteHistory(Long timeBucketBefore);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
/**
* @author peng-yongsheng
*/
public class StorageException extends Exception {
public StorageException(String message) {
super(message);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.define.IndicatorMapper;
import org.apache.skywalking.oap.server.core.storage.annotation.ColumnAnnotationRetrieval;
import org.apache.skywalking.oap.server.core.storage.define.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class StorageInstaller {
private static final Logger logger = LoggerFactory.getLogger(StorageInstaller.class);
private final ModuleManager moduleManager;
private final ColumnAnnotationRetrieval annotationRetrieval;
public StorageInstaller(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.annotationRetrieval = new ColumnAnnotationRetrieval();
}
public final void install(Client client) throws StorageException {
IndicatorMapper indicatorMapper = moduleManager.find(CoreModule.NAME).getService(IndicatorMapper.class);
Collection<Class<Indicator>> indicatorClasses = indicatorMapper.indicatorClasses();
Boolean debug = System.getProperty("debug") != null;
for (Class<Indicator> indicatorClass : indicatorClasses) {
List<ColumnDefine> columnDefines = annotationRetrieval.retrieval(indicatorClass);
String tableName;
try {
tableName = indicatorClass.newInstance().name();
} catch (InstantiationException | IllegalAccessException e) {
throw new StorageException(e.getMessage());
}
TableDefine tableDefine = new TableDefine(tableName, columnDefines);
if (!isExists(client, tableDefine)) {
logger.info("table: {} not exists", tableDefine.getName());
createTable(client, tableDefine);
} else if (debug) {
logger.info("table: {} exists", tableDefine.getName());
deleteTable(client, tableDefine);
createTable(client, tableDefine);
}
columnCheck(client, tableDefine);
}
}
protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException;
protected abstract void columnCheck(Client client, TableDefine tableDefine) throws StorageException;
protected abstract void deleteTable(Client client, TableDefine tableDefine) throws StorageException;
protected abstract void createTable(Client client, TableDefine tableDefine) throws StorageException;
}
......@@ -32,6 +32,6 @@ public class StorageModule extends ModuleDefine {
}
@Override public Class[] services() {
return new Class[] {};
return new Class[] {IBatchDAO.class, IPersistenceDAO.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {
String columnName();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.annotation;
import java.lang.reflect.Field;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.define.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ColumnAnnotationRetrieval {
private static final Logger logger = LoggerFactory.getLogger(ColumnAnnotationRetrieval.class);
public List<ColumnDefine> retrieval(Class<Indicator> indicatorClass) {
if (logger.isDebugEnabled()) {
logger.debug("Retrieval column annotation from class {}", indicatorClass.getName());
}
List<ColumnDefine> columnDefines = new LinkedList<>();
retrieval(indicatorClass, columnDefines);
return columnDefines;
}
private void retrieval(Class clazz, List<ColumnDefine> columnDefines) {
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.isAnnotationPresent(Column.class)) {
Column column = field.getAnnotation(Column.class);
columnDefines.add(new ColumnDefine(new ColumnName(column.columnName(), column.columnName()), field.getType()));
if (logger.isDebugEnabled()) {
logger.debug("The field named {} with the {} type", column.columnName(), field.getType());
}
}
}
if (Objects.nonNull(clazz.getSuperclass())) {
retrieval(clazz.getSuperclass(), columnDefines);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.define;
/**
* @author peng-yongsheng
*/
public class ColumnDefine {
private final ColumnName columnName;
private final Class<?> type;
public ColumnDefine(ColumnName columnName, Class<?> type) {
this.columnName = columnName;
this.type = type;
}
public final ColumnName getColumnName() {
return columnName;
}
public final Class<?> getType() {
return type;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.define;
/**
* @author peng-yongsheng
*/
public class ColumnName {
private final String fullName;
private final String shortName;
private boolean useShortName = false;
public ColumnName(String fullName, String shortName) {
this.fullName = fullName;
this.shortName = shortName;
}
public String getName() {
return useShortName ? shortName : fullName;
}
public void useShortName() {
this.useShortName = true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.define;
/**
* @author peng-yongsheng
*/
public interface ColumnTypeMapping {
String transform(Class<?> type);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage.define;
import java.util.List;
/**
* @author peng-yongsheng
*/
public class TableDefine {
private final String name;
private final List<ColumnDefine> columnDefines;
public TableDefine(String name, List<ColumnDefine> columnDefines) {
this.name = name;
this.columnDefines = columnDefines;
}
public final String getName() {
return name;
}
public final List<ColumnDefine> getColumnDefines() {
return columnDefines;
}
}
......@@ -18,8 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.indicator.define;
import java.util.Map;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
/**
......@@ -35,4 +36,20 @@ public class TestAvgIndicator extends AvgIndicator {
@Override public void deserialize(RemoteData remoteData) {
}
@Override public String id() {
return null;
}
@Override public String name() {
return null;
}
@Override public Map<String, Object> toMap() {
return null;
}
@Override public Indicator newOne(Map<String, Object> dbMap) {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.LinkedList;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.define.*;
import org.apache.skywalking.oap.server.core.storage.define.TableDefine;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.module.*;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
/**
* @author peng-yongsheng
*/
public class StorageInstallerTestCase {
@Test
public void testInstall() throws StorageException, DuplicateProviderException, ServiceNotProvidedException, IndicatorDefineLoadException {
IndicatorMapper indicatorMapper = new IndicatorMapper();
CoreModuleProvider moduleProvider = Mockito.mock(CoreModuleProvider.class);
CoreModule moduleDefine = Mockito.spy(CoreModule.class);
ModuleManager moduleManager = Mockito.mock(ModuleManager.class);
LinkedList<ModuleProvider> moduleProviders = Whitebox.getInternalState(moduleDefine, "loadedProviders");
moduleProviders.add(moduleProvider);
Mockito.when(moduleManager.find(CoreModule.NAME)).thenReturn(moduleDefine);
Mockito.when(moduleProvider.getService(IndicatorMapper.class)).thenReturn(indicatorMapper);
indicatorMapper.load();
TestStorageInstaller installer = new TestStorageInstaller(moduleManager);
installer.install(null);
}
class TestStorageInstaller extends StorageInstaller {
public TestStorageInstaller(ModuleManager moduleManager) {
super(moduleManager);
}
@Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException {
return false;
}
@Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
}
@Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
}
@Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
}
}
}
......@@ -49,7 +49,7 @@
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import java.io.IOException;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.*;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.*;
import org.elasticsearch.action.admin.indices.create.*;
import org.elasticsearch.action.admin.indices.delete.*;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ElasticSearchClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
private static final String TYPE = "type";
private final String clusterNodes;
private final NameSpace namespace;
private RestHighLevelClient client;
public ElasticSearchClient(String clusterNodes, NameSpace namespace) {
this.clusterNodes = clusterNodes;
this.namespace = namespace;
}
@Override public void initialize() {
List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
client = new RestHighLevelClient(
RestClient.builder(pairsList.toArray(new HttpHost[0])));
}
@Override public void shutdown() {
try {
client.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
private List<HttpHost> parseClusterNodes(String nodes) {
List<HttpHost> httpHosts = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes);
String[] nodesSplit = nodes.split(",");
for (String node : nodesSplit) {
String host = node.split(":")[0];
String port = node.split(":")[1];
httpHosts.add(new HttpHost(host, Integer.valueOf(port)));
}
return httpHosts;
}
public boolean createIndex(String indexName, Settings settings,
XContentBuilder mappingBuilder) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings);
request.mapping(TYPE, mappingBuilder);
CreateIndexResponse response;
response = client.indices().create(request);
logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean deleteIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
DeleteIndexResponse response;
response = client.indices().delete(request);
logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean isExistsIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
GetIndexRequest request = new GetIndexRequest();
request.indices(indexName);
return client.indices().exists(request);
}
public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source(searchSourceBuilder);
return client.search(searchRequest);
}
public GetResponse get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
GetRequest request = new GetRequest(indexName, TYPE, id);
return client.get(request);
}
public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
indexName = formatIndexName(indexName);
return new IndexRequest(indexName, TYPE, id).source(source);
}
public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
indexName = formatIndexName(indexName);
return new UpdateRequest(indexName, TYPE, id).doc(source);
}
public void delete(String indexName, String timeBucketColumnName, long startTimeBucket,
long endTimeBucket) throws IOException {
indexName = formatIndexName(indexName);
Map<String, String> params = Collections.singletonMap("pretty", "true");
String jsonString = "{" +
" \"query\": {" +
" \"range\": {" +
" \"" + timeBucketColumnName + "\": {" +
" \"gte\": " + startTimeBucket + "," +
" \"lte\": " + endTimeBucket + "" +
" }" +
" }" +
" }" +
"}";
HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
}
private String formatIndexName(String indexName) {
if (Objects.nonNull(namespace) && StringUtils.isNotEmpty(namespace.getNameSpace())) {
return namespace.getNameSpace() + "_" + indexName;
}
return indexName;
}
public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval,
int concurrentRequests) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
}
};
return BulkProcessor.builder(client::bulkAsync, listener)
.setBulkActions(bulkActions)
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
.setConcurrentRequests(concurrentRequests)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import org.apache.skywalking.oap.server.library.client.ClientException;
/**
* @author peng-yongsheng
*/
public class ElasticSearchClientException extends ClientException {
public ElasticSearchClientException(String message) {
super(message);
}
public ElasticSearchClientException(String message, Throwable cause) {
super(message, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.client.elasticsearch;
import java.io.IOException;
import org.apache.skywalking.oap.server.library.client.ClientException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.junit.Assert;
/**
* @author peng-yongsheng
*/
public class ElasticSearchClientTestCase {
public static void main(String[] args) throws IOException, ClientException {
Settings settings = Settings.builder()
.put("number_of_shards", 2)
.put("number_of_replicas", 0)
.build();
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject()
.startObject("_all")
.field("enabled", false)
.endObject()
.startObject("properties")
.startObject("column1")
.field("type", "text")
.endObject()
.endObject();
builder.endObject();
ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null);
client.initialize();
String indexName = "test";
client.createIndex(indexName, settings, builder);
Assert.assertTrue(client.isExistsIndex(indexName));
client.deleteIndex(indexName);
Assert.assertFalse(client.isExistsIndex(indexName));
client.shutdown();
}
}
......@@ -18,11 +18,9 @@
package org.apache.skywalking.oap.server.library.module;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.util.*;
import org.slf4j.*;
/**
* A module definition.
......@@ -31,7 +29,7 @@ import java.util.*;
*/
public abstract class ModuleDefine {
private final Logger logger = LoggerFactory.getLogger(ModuleDefine.class);
private static final Logger logger = LoggerFactory.getLogger(ModuleDefine.class);
private LinkedList<ModuleProvider> loadedProviders = new LinkedList<>();
......@@ -128,7 +126,7 @@ public abstract class ModuleDefine {
return loadedProviders;
}
final ModuleProvider provider() throws DuplicateProviderException {
public final ModuleProvider provider() throws DuplicateProviderException {
if (loadedProviders.size() > 1) {
throw new DuplicateProviderException(this.name() + " module exist " + loadedProviders.size() + " providers");
}
......
......@@ -35,6 +35,20 @@ core:
gRPCPort: 11800
storage:
elasticsearch:
clusterNodes: localhost:9200
indexShardsNumber: 2
indexReplicasNumber: 0
# Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
bulkActions: 2000 # Execute the bulk every 2000 requests
bulkSize: 20 # flush the bulk every 20mb
flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: 2 # the number of concurrent requests
# Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted.
traceDataTTL: 90 # Unit is minute
minuteMetricDataTTL: 90 # Unit is minute
hourMetricDataTTL: 36 # Unit is hour
dayMetricDataTTL: 45 # Unit is day
monthMetricDataTTL: 18 # Unit is month
service-mesh:
default:
query:
......
......@@ -36,5 +36,10 @@
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
......@@ -25,6 +26,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
*/
public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Setter @Getter private String nameSpace;
@Setter @Getter private String clusterNodes;
private int indexShardsNumber;
private int indexReplicasNumber;
private boolean highPerformanceMode;
......
......@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.slf4j.*;
/**
......@@ -29,11 +32,14 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
private final StorageModuleElasticsearchConfig storageConfig;
private final StorageModuleElasticsearchConfig config;
private final NameSpace nameSpace;
private ElasticSearchClient elasticSearchClient;
public StorageModuleElasticsearchProvider() {
super();
this.storageConfig = new StorageModuleElasticsearchConfig();
this.config = new StorageModuleElasticsearchConfig();
this.nameSpace = new NameSpace();
}
@Override
......@@ -42,21 +48,34 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
}
@Override
public Class module() {
public Class<? extends ModuleDefine> module() {
return StorageModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return storageConfig;
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException {
elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), nameSpace);
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(IPersistenceDAO.class, new PersistenceEsDAO(elasticSearchClient, nameSpace));
}
@Override
public void start() throws ModuleStartException {
try {
nameSpace.setNameSpace(config.getNameSpace());
elasticSearchClient.initialize();
StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
installer.install(elasticSearchClient);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
private static final Logger logger = LoggerFactory.getLogger(BatchProcessEsDAO.class);
private BulkProcessor bulkProcessor;
private final int bulkActions;
private final int bulkSize;
private final int flushInterval;
private final int concurrentRequests;
public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
int concurrentRequests) {
super(client);
this.bulkActions = bulkActions;
this.bulkSize = bulkSize;
this.flushInterval = flushInterval;
this.concurrentRequests = concurrentRequests;
}
@Override public void batchPersistence(List<?> batchCollection) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests);
}
if (logger.isDebugEnabled()) {
logger.debug("bulk data size: {}", batchCollection.size());
}
if (CollectionUtils.isNotEmpty(batchCollection)) {
batchCollection.forEach(builder -> {
if (builder instanceof IndexRequest) {
this.bulkProcessor.add((IndexRequest)builder);
}
if (builder instanceof UpdateRequest) {
this.bulkProcessor.add((UpdateRequest)builder);
}
});
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.apache.skywalking.oap.server.core.storage.define.ColumnTypeMapping;
/**
* @author peng-yongsheng
*/
public class ColumnTypeEsMapping implements ColumnTypeMapping {
@Override public String transform(Class<?> type) {
if (Integer.class.equals(type) || int.class.equals(type)) {
return "integer";
} else if (Long.class.equals(type) || long.class.equals(type)) {
return "long";
} else if (Double.class.equals(type) || double.class.equals(type)) {
return "double";
} else if (String.class.equals(type)) {
return "text";
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
private static final Logger logger = LoggerFactory.getLogger(EsDAO.class);
public EsDAO(ElasticSearchClient client) {
super(client);
}
protected final int getMaxId(String indexName, String columnName) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(AggregationBuilders.max("agg").field(columnName));
searchSourceBuilder.size(0);
return getResponse(indexName, searchSourceBuilder);
}
protected final int getMinId(String indexName, String columnName) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(AggregationBuilders.min("agg").field(columnName));
searchSourceBuilder.size(0);
return getResponse(indexName, searchSourceBuilder);
}
private int getResponse(String indexName, SearchSourceBuilder searchSourceBuilder) {
try {
SearchResponse searchResponse = getClient().search(indexName, searchSourceBuilder);
Max agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return 0;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import java.util.Map;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.IPersistenceDAO;
import org.apache.skywalking.oap.server.library.client.NameSpace;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.*;
/**
* @author peng-yongsheng
*/
public class PersistenceEsDAO implements IPersistenceDAO<IndexRequest, UpdateRequest, Indicator> {
private final ElasticSearchClient client;
private final NameSpace nameSpace;
public PersistenceEsDAO(ElasticSearchClient client, NameSpace nameSpace) {
this.client = client;
this.nameSpace = nameSpace;
}
@Override public Indicator get(Indicator input) throws IOException {
GetResponse response = client.get(nameSpace.getNameSpace() + "_" + input.name(), input.id());
if (response.isExists()) {
return input.newOne(response.getSource());
} else {
return null;
}
}
@Override public IndexRequest prepareBatchInsert(Indicator input) throws IOException {
Map<String, Object> objectMap = input.toMap();
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (String key : objectMap.keySet()) {
builder.field(key, objectMap.get(key));
}
builder.endObject();
return client.prepareInsert(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder);
}
@Override public UpdateRequest prepareBatchUpdate(Indicator input) throws IOException {
Map<String, Object> objectMap = input.toMap();
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (String key : objectMap.keySet()) {
builder.field(key, objectMap.get(key));
}
builder.endObject();
return client.prepareUpdate(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder);
}
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.define.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class StorageEsInstaller extends StorageInstaller {
private static final Logger logger = LoggerFactory.getLogger(StorageEsInstaller.class);
private final int indexShardsNumber;
private final int indexReplicasNumber;
private final ColumnTypeEsMapping mapping;
public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) {
super(moduleManager);
this.indexShardsNumber = indexShardsNumber;
this.indexReplicasNumber = indexReplicasNumber;
this.mapping = new ColumnTypeEsMapping();
}
@Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
try {
return esClient.isExistsIndex(tableDefine.getName());
} catch (IOException e) {
throw new StorageException(e.getMessage());
}
}
@Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
}
@Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
try {
if (!esClient.deleteIndex(tableDefine.getName())) {
throw new StorageException(tableDefine.getName() + " index delete failure.");
}
} catch (IOException e) {
throw new StorageException(tableDefine.getName() + " index delete failure.");
}
}
@Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
// mapping
XContentBuilder mappingBuilder = null;
Settings settings = createSettingBuilder();
try {
mappingBuilder = createMappingBuilder(tableDefine);
logger.info("mapping builder str: {}", mappingBuilder.prettyPrint());
} catch (Exception e) {
logger.error("create {} index mapping builder error", tableDefine.getName());
}
boolean isAcknowledged;
try {
isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mappingBuilder);
} catch (IOException e) {
throw new StorageException(e.getMessage());
}
logger.info("create {} index finished, isAcknowledged: {}", tableDefine.getName(), isAcknowledged);
if (!isAcknowledged) {
throw new StorageException("create " + tableDefine.getName() + " index failure, ");
}
}
private Settings createSettingBuilder() {
return Settings.builder()
.put("index.number_of_shards", indexShardsNumber)
.put("index.number_of_replicas", indexReplicasNumber)
.put("index.refresh_interval", "3s")
.put("analysis.analyzer.collector_analyzer.type", "stop")
.build();
}
private XContentBuilder createMappingBuilder(TableDefine tableDefine) throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("_all")
.field("enabled", false)
.endObject()
.startObject("properties");
for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
mappingBuilder
.startObject(columnDefine.getColumnName().getName())
.field("type", mapping.transform(columnDefine.getType()))
.endObject();
}
mappingBuilder
.endObject()
.endObject();
logger.debug("create elasticsearch index: {}", mappingBuilder.prettyPrint());
return mappingBuilder;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.junit.*;
/**
* @author peng-yongsheng
*/
public class ElasticSearchColumnTypeMappingTestCase {
@Test
public void test() {
ColumnTypeEsMapping mapping = new ColumnTypeEsMapping();
Assert.assertEquals("integer", mapping.transform(int.class));
Assert.assertEquals("integer", mapping.transform(Integer.class));
Assert.assertEquals("long", mapping.transform(long.class));
Assert.assertEquals("long", mapping.transform(Long.class));
Assert.assertEquals("double", mapping.transform(double.class));
Assert.assertEquals("double", mapping.transform(Double.class));
Assert.assertEquals("text", mapping.transform(String.class));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册