未验证 提交 5e8f1eb5 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support multiple implementations of StorageBuilder in different storage...

Support multiple implementations of StorageBuilder in different storage implementations - stage 2 (#6336)
上级 fc23dabf
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import com.google.common.collect.Maps;
import org.apache.skywalking.apm.network.language.agent.v3.Label;
import org.apache.skywalking.apm.network.language.agent.v3.MeterBucketValue;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterHistogram;
import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfigs;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import java.util.HashMap;
import java.util.List;
import static org.mockito.Mockito.when;
public abstract class MeterBaseTest {
private static final String CONFIG_PATH = "meter-analyzer-config";
@Mock
protected CoreModuleProvider moduleProvider;
@Mock
protected ModuleManager moduleManager;
protected MeterSystem meterSystem;
protected MeterProcessor processor;
protected long timestamp;
@Before
public void init() throws Exception {
// prepare the context
meterSystem = Mockito.spy(new MeterSystem(moduleManager));
CoreModule coreModule = Mockito.spy(CoreModule.class);
// disable meter register
DisableRegister.INSTANCE.add("meter_build_test1");
DisableRegister.INSTANCE.add("meter_build_test2");
DisableRegister.INSTANCE.add("meter_build_test3");
Whitebox.setInternalState(coreModule, "loadedProvider", moduleProvider);
when(moduleManager.find(CoreModule.NAME)).thenReturn(coreModule);
when(moduleProvider.getService(MeterSystem.class))
.thenReturn(meterSystem);
// prepare the meter functions
final HashMap<String, Class> map = Maps.newHashMap();
map.put("avg", AvgFunction.class);
map.put("avgHistogram", AvgHistogramFunction.class);
map.put("avgHistogramPercentile", AvgHistogramPercentileFunction.class);
Whitebox.setInternalState(meterSystem, "functionRegister", map);
// load context
List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(CONFIG_PATH, new String[] {"config.yaml"});
final MeterProcessService service = new MeterProcessService(moduleManager);
service.start(meterConfigs);
// create process and read meters
processor = service.createProcessor();
timestamp = System.currentTimeMillis();
// single value
processor.read(MeterData.newBuilder()
.setService("service").setServiceInstance("instance").setTimestamp(timestamp)
.setSingleValue(MeterSingleValue.newBuilder().setName("test_count1")
.addLabels(Label.newBuilder()
.setName("k1")
.setValue("v1")
.build()).setValue(1).build())
.build());
// histogram
processor.read(MeterData.newBuilder()
.setHistogram(MeterHistogram.newBuilder().setName("test_histogram")
.addLabels(
Label.newBuilder().setName("k2").setValue("v2").build())
.addLabels(
Label.newBuilder().setName("endpoint").setValue("test_endpoint").build())
.addValues(MeterBucketValue.newBuilder()
.setBucket(1)
.setCount(10)
.build())
.addValues(MeterBucketValue.newBuilder()
.setBucket(5)
.setCount(15)
.build())
.addValues(MeterBucketValue.newBuilder()
.setBucket(10)
.setCount(3)
.build())
.build())
.build());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"com.sun.org.apache.xerces.*", "javax.xml.*", "org.xml.*", "javax.management.*", "org.w3c.*"})
public class MeterBuilderTest extends MeterBaseTest {
@Test
public void testBuildAndSend() throws ModuleStartException {
List<AcceptableValue> values = new ArrayList<>();
doAnswer(invocationOnMock -> {
values.add(invocationOnMock.getArgument(0, AcceptableValue.class));
return null;
}).when(meterSystem).doStreamingCalculation(any());
// Prcess the meters
processor.process();
Assert.assertEquals(3, values.size());
// Check avg
final AvgFunction avg = (AvgFunction) values.get(0);
Assert.assertEquals(1, avg.getSummation());
Assert.assertEquals(1, avg.getCount());
Assert.assertEquals(IDManager.ServiceID.buildId("service", true), avg.getServiceId());
Assert.assertEquals(IDManager.ServiceID.buildId("service", true), avg.getEntityId());
Assert.assertEquals(TimeBucket.getMinuteTimeBucket(timestamp), avg.getTimeBucket());
// Check avgHistogram
final AvgHistogramFunction avgHistogram = (AvgHistogramFunction) values.get(1);
verifyDataTable(avgHistogram.getSummation(), 1, 10, 5, 15, 10, 3);
verifyDataTable(avgHistogram.getCount(), 1, 1, 5, 1, 10, 1);
Assert.assertEquals(IDManager.ServiceInstanceID.buildId(
IDManager.ServiceID.buildId("service", true), "instance"), avgHistogram.getEntityId());
Assert.assertEquals(TimeBucket.getMinuteTimeBucket(timestamp), avgHistogram.getTimeBucket());
// Check avgHistogramPercentile
final AvgHistogramPercentileFunction avgPercentile = (AvgHistogramPercentileFunction) values.get(2);
Assert.assertEquals(3, avgPercentile.getRanks().size());
Assert.assertEquals(50, avgPercentile.getRanks().get(0));
Assert.assertEquals(90, avgPercentile.getRanks().get(1));
Assert.assertEquals(99, avgPercentile.getRanks().get(2));
Assert.assertEquals(IDManager.EndpointID.buildId(
IDManager.ServiceID.buildId("service", true), "test_endpoint"), avgPercentile.getEntityId());
verifyDataTable(avgPercentile.getSummation(), 1, 10, 5, 15, 10, 3);
verifyDataTable(avgPercentile.getCount(), 1, 1, 5, 1, 10, 1);
}
private void verifyDataTable(DataTable table, Object... data) {
Assert.assertEquals(data.length / 2, table.size());
for (int i = 0; i < data.length; i += 2) {
Assert.assertEquals(
Long.parseLong(String.valueOf(data[i + 1])), table.get(String.valueOf(data[i])).longValue());
}
}
}
......@@ -65,6 +65,7 @@ import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.oal.rt.OALCompileException;
import org.apache.skywalking.oap.server.core.oal.rt.OALDefine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngine;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
......@@ -80,7 +81,6 @@ public class OALRuntime implements OALEngine {
private static final String CLASS_FILE_CHARSET = "UTF-8";
private static final String METRICS_FUNCTION_PACKAGE = "org.apache.skywalking.oap.server.core.analysis.metrics.";
private static final String WITH_METADATA_INTERFACE = "org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata";
private static final String STORAGE_BUILDER_INTERFACE = "org.apache.skywalking.oap.server.core.storage.StorageHashMapBuilder";
private static final String DISPATCHER_INTERFACE = "org.apache.skywalking.oap.server.core.analysis.SourceDispatcher";
private static final String METRICS_STREAM_PROCESSOR = "org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor";
private static final String[] METRICS_CLASS_METHODS = {
......@@ -107,6 +107,7 @@ public class OALRuntime implements OALEngine {
private AllDispatcherContext allDispatcherContext;
private StreamAnnotationListener streamAnnotationListener;
private DispatcherDetectorListener dispatcherDetectorListener;
private StorageBuilderFactory storageBuilderFactory;
private final List<Class> metricsClasses;
private final List<Class> dispatcherClasses;
private final boolean openEngineDebug;
......@@ -133,6 +134,11 @@ public class OALRuntime implements OALEngine {
dispatcherDetectorListener = listener;
}
@Override
public void setStorageBuilderFactory(final StorageBuilderFactory factory) {
storageBuilderFactory = factory;
}
@Override
public void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException {
if (!IS_RT_TEMP_FOLDER_INIT_COMPLETED) {
......@@ -318,7 +324,7 @@ public class OALRuntime implements OALEngine {
String className = metricsBuilderClassName(metricsStmt, false);
CtClass metricsBuilderClass = classPool.makeClass(metricsBuilderClassName(metricsStmt, true));
try {
metricsBuilderClass.addInterface(classPool.get(STORAGE_BUILDER_INTERFACE));
metricsBuilderClass.addInterface(classPool.get(storageBuilderFactory.builderTemplate().getSuperClass()));
} catch (NotFoundException e) {
log.error("Can't find StorageBuilder interface for " + className + ".", e);
throw new OALCompileException(e.getMessage(), e);
......@@ -342,7 +348,9 @@ public class OALRuntime implements OALEngine {
for (String method : METRICS_BUILDER_CLASS_METHODS) {
StringWriter methodEntity = new StringWriter();
try {
configuration.getTemplate("metrics-builder/" + method + ".ftl").process(metricsStmt, methodEntity);
configuration
.getTemplate(storageBuilderFactory.builderTemplate().getTemplatePath() + "/" + method + ".ftl")
.process(metricsStmt, methodEntity);
metricsBuilderClass.addMethod(CtNewMethod.make(methodEntity.toString(), metricsBuilderClass));
} catch (Exception e) {
log.error("Can't generate method " + method + " for " + className + ".", e);
......
......@@ -51,6 +51,10 @@ public class StreamAnnotationListener implements AnnotationListener {
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream) aClass.getAnnotation(Stream.class);
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
......
......@@ -18,22 +18,24 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.management.ManagementData;
import org.apache.skywalking.oap.server.core.storage.IManagementDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
......@@ -60,15 +62,16 @@ public class ManagementStreamProcessor implements StreamProcessor<ManagementData
@Override
public void create(final ModuleDefineHolder moduleDefineHolder, final Stream stream, final Class<? extends ManagementData> streamClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(streamClass, stream.builder());
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IManagementDAO managementDAO;
try {
managementDAO = storageDAO.newManagementDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
managementDAO = storageDAO.newManagementDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.builder()
.getSimpleName() + " none stream record DAO failure.", e);
}
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -26,7 +27,6 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.MetricsExtension;
import org.apache.skywalking.oap.server.core.analysis.Stream;
......@@ -35,12 +35,14 @@ import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -103,15 +105,16 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
public void create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition stream,
Class<? extends Metrics> metricsClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.getName())) {
return;
}
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(metricsClass, stream.getBuilder());
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IMetricsDAO metricsDAO;
try {
metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
metricsDAO = storageDAO.newMetricsDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
}
......
......@@ -18,22 +18,24 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.config.NoneStream;
import org.apache.skywalking.oap.server.core.storage.INoneStreamDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
......@@ -60,15 +62,16 @@ public class NoneStreamProcessor implements StreamProcessor<NoneStream> {
@Override
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends NoneStream> streamClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(streamClass, stream.builder());
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
INoneStreamDAO noneStream;
try {
noneStream = storageDAO.newNoneStreamDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
noneStream = storageDAO.newNoneStreamDao(builder.getDeclaredConstructor().newInstance());
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.builder()
.getSimpleName() + " none stream record DAO failure.", e);
}
......
......@@ -18,22 +18,24 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
public class RecordStreamProcessor implements StreamProcessor<Record> {
......@@ -57,15 +59,16 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
@Override
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(recordClass, stream.builder());
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
}
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -26,19 +27,20 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
/**
......@@ -66,17 +68,18 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
@Override
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) throws StorageException {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(topNClass, stream.builder());
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());
} catch (InstantiationException | IllegalAccessException e) {
throw new UnexpectedException("Create " + stream.builder()
.getSimpleName() + " top n record DAO failure.", e);
recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException(
"Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e);
}
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.oal.rt;
import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
/**
......@@ -30,6 +31,8 @@ public interface OALEngine {
void setDispatcherListener(DispatcherDetectorListener listener) throws ModuleStartException;
void setStorageBuilderFactory(StorageBuilderFactory factory);
void start(ClassLoader currentClassLoader) throws ModuleStartException, OALCompileException;
void notifyAllListeners() throws ModuleStartException;
......
......@@ -25,6 +25,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
......@@ -56,6 +58,9 @@ public class OALEngineLoaderService implements Service {
.provider()
.getService(SourceReceiver.class)
.getDispatcherDetectorListener());
engine.setStorageBuilderFactory(moduleManager.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class));
engine.start(OALEngineLoaderService.class.getClassLoader());
engine.notifyAllListeners();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.HashMap;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* StorageBuilderFactory provides the capabilities to override the default storage builders, which are implementations
* of {@link StorageHashMapBuilder}.
*
* Typically, the storage needs to provide a more native format rather than {@link java.util.HashMap}.
*/
public interface StorageBuilderFactory extends Service {
/**
* @return the builder definition for OAL Engine.
*/
BuilderTemplateDefinition builderTemplate();
/**
* Fetch the real builder by the given type of stream data and the static declared by the {@link Stream#builder()}.
*
* @param dataType of the stream data.
* @param defaultBuilder static builder.
* @return the builder used in the runtime.
*/
Class<? extends StorageBuilder> builderOf(Class<? extends StorageData> dataType,
Class<? extends StorageBuilder> defaultBuilder);
@Getter
@RequiredArgsConstructor
class BuilderTemplateDefinition {
/**
* The parent class of the generator builder.
*/
private final String superClass;
/**
* This folder includes entity2Storage.ftl and storage2Entity.ftl to support the builder's generation.
*/
private final String templatePath;
}
/**
* The default storage builder. Use {@link StorageHashMapBuilder} to provide general suitable entity builder
* implementation, which deliver {@link HashMap} to storage module implementation.
*/
class Default implements StorageBuilderFactory {
@Override
public BuilderTemplateDefinition builderTemplate() {
return new BuilderTemplateDefinition(
StorageHashMapBuilder.class.getName(), "metrics-builder");
}
@Override
public Class<? extends StorageBuilder> builderOf(final Class<? extends StorageData> dataType,
final Class<? extends StorageBuilder> defaultBuilder) {
return defaultBuilder;
}
}
}
......@@ -49,25 +49,26 @@ public class StorageModule extends ModuleDefine {
@Override
public Class[] services() {
return new Class[]{
IBatchDAO.class,
StorageDAO.class,
IHistoryDeleteDAO.class,
INetworkAddressAliasDAO.class,
ITopologyQueryDAO.class,
IMetricsQueryDAO.class,
ITraceQueryDAO.class,
IMetadataQueryDAO.class,
IAggregationQueryDAO.class,
IAlarmQueryDAO.class,
ITopNRecordsQueryDAO.class,
ILogQueryDAO.class,
IProfileTaskQueryDAO.class,
IProfileTaskLogQueryDAO.class,
IProfileThreadSnapshotQueryDAO.class,
UITemplateManagementDAO.class,
IBrowserLogQueryDAO.class,
IEventQueryDAO.class
return new Class[] {
StorageBuilderFactory.class,
IBatchDAO.class,
StorageDAO.class,
IHistoryDeleteDAO.class,
INetworkAddressAliasDAO.class,
ITopologyQueryDAO.class,
IMetricsQueryDAO.class,
ITraceQueryDAO.class,
IMetadataQueryDAO.class,
IAggregationQueryDAO.class,
IAlarmQueryDAO.class,
ITopNRecordsQueryDAO.class,
ILogQueryDAO.class,
IProfileTaskQueryDAO.class,
IProfileTaskLogQueryDAO.class,
IProfileThreadSnapshotQueryDAO.class,
UITemplateManagementDAO.class,
IBrowserLogQueryDAO.class,
IEventQueryDAO.class
};
}
}
......@@ -31,6 +31,7 @@ import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
......@@ -113,6 +114,8 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
......
......@@ -29,6 +29,7 @@ import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
......@@ -112,6 +113,8 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
......
......@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
......@@ -95,6 +96,8 @@ public class InfluxStorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
client = new InfluxClient(config);
this.registerServiceImplementation(IBatchDAO.class, new BatchDAO(client));
......
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
......@@ -107,6 +108,8 @@ public class H2StorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
Properties settings = new Properties();
settings.setProperty("dataSourceClassName", config.getDriver());
settings.setProperty("dataSource.url", config.getUrl());
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
......@@ -98,6 +99,8 @@ public class MySQLStorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
mysqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
......@@ -101,6 +102,8 @@ public class TiDBStorageProvider extends ModuleProvider {
@Override
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
mysqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册