提交 f658d9eb 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Simplify the PxxMetrics and ThermodynamicMetrics to improve performance (#3162)

* Feature of database session

* Make it configurable.

* Change the metrics process flow.
before: metrics entrance -> aggregate worker -> remote worker -> trans worker -> minute, hour, day, month persistence worker -> storage
after: metrics entrance -> aggregate worker -> remote worker -> minute persistence worker ->  trans worker -> hour, day, month persistence worker -> storage

* IntKeyLongValueHashMap instead of IntKeyLongValueArray.

* Make the OAP server can't startup.

* Finish

* Rename the method and fixed some test case issues.

* Rename field.

* no message
上级 0cf99802
......@@ -136,8 +136,8 @@ public class AnalysisResult {
case "long":
serializeFields.addLongField(column.getFieldName());
break;
case "IntKeyLongValueArray":
serializeFields.addIntLongValuePairelistField(column.getFieldName());
case "IntKeyLongValueHashMap":
serializeFields.addIntKeyLongValueHashMapField(column.getFieldName());
break;
default:
throw new IllegalStateException("Unexpected field type [" + type + "] of persistence column [" + column.getFieldName() + "]");
......
......@@ -25,7 +25,7 @@ public class PersistenceColumns {
private List<PersistenceField> longFields = new LinkedList<>();
private List<PersistenceField> doubleFields = new LinkedList<>();
private List<PersistenceField> intFields = new LinkedList<>();
private List<PersistenceField> intLongValuePairListFields = new LinkedList<>();
private List<PersistenceField> intKeyLongValueHashMap = new LinkedList<>();
public void addStringField(String fieldName) {
stringFields.add(new PersistenceField(fieldName));
......@@ -43,8 +43,8 @@ public class PersistenceColumns {
intFields.add(new PersistenceField(fieldName));
}
public void addIntLongValuePairelistField(String fieldName) {
intLongValuePairListFields.add(new PersistenceField(fieldName));
public void addIntKeyLongValueHashMapField(String fieldName) {
intKeyLongValueHashMap.add(new PersistenceField(fieldName));
}
public List<PersistenceField> getStringFields() {
......@@ -63,7 +63,7 @@ public class PersistenceColumns {
return intFields;
}
public List<PersistenceField> getIntLongValuePairListFields() {
return intLongValuePairListFields;
public List<PersistenceField> getIntKeyLongValueHashMapFields() {
return intKeyLongValueHashMap;
}
}
......@@ -15,13 +15,13 @@ public void deserialize(org.apache.skywalking.oap.server.core.remote.grpc.proto.
${field.setter}(remoteData.getDataIntegers(${field?index}));
</#list>
<#list serializeFields.intLongValuePairListFields as field>
setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueArray(30));
<#list serializeFields.intKeyLongValueHashMapFields as field>
setDetailGroup(new org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap(30));
java.util.Iterator iterator = remoteData.getDataIntLongPairListList().iterator();
while (iterator.hasNext()) {
org.apache.skywalking.oap.server.core.remote.grpc.proto.IntKeyLongValuePair element = (org.apache.skywalking.oap.server.core.remote.grpc.proto.IntKeyLongValuePair)(iterator.next());
super.getDetailGroup().add(new org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValue(element.getKey(), element.getValue()));
super.getDetailGroup().put(new Integer(element.getKey()), new org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValue(element.getKey(), element.getValue()));
}
</#list>
}
\ No newline at end of file
......@@ -15,8 +15,8 @@ public org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData.Builde
<#list serializeFields.intFields as field>
remoteBuilder.addDataIntegers(${field.getter}());
</#list>
<#list serializeFields.intLongValuePairListFields as field>
java.util.Iterator iterator = super.getDetailGroup().iterator();
<#list serializeFields.intKeyLongValueHashMapFields as field>
java.util.Iterator iterator = super.getDetailGroup().values().iterator();
while (iterator.hasNext()) {
remoteBuilder.addDataIntLongPairList(((org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValue)(iterator.next())).serialize());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.core.analysis.metrics;
/**
* @author peng-yongsheng
*/
public abstract class GroupMetrics extends Metrics {
protected void combine(IntKeyLongValueHashMap source, IntKeyLongValueHashMap target) {
source.forEach((key, element) -> {
IntKeyLongValue existingElement = target.get(key);
if (existingElement == null) {
target.put(key, new IntKeyLongValue(key, element.getValue()));
} else {
existingElement.addValue(element.getValue());
}
});
}
}
......@@ -13,40 +13,42 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.metrics;
import java.util.ArrayList;
import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
/**
* @author peng-yongsheng
*/
public class IntKeyLongValueArray extends ArrayList<IntKeyLongValue> implements StorageDataType {
public class IntKeyLongValueHashMap extends HashMap<Integer, IntKeyLongValue> implements StorageDataType {
public IntKeyLongValueArray(int initialCapacity) {
super(initialCapacity);
public IntKeyLongValueHashMap() {
super();
}
public IntKeyLongValueArray() {
super(30);
public IntKeyLongValueHashMap(int initialCapacity) {
super(initialCapacity);
}
public IntKeyLongValueArray(String data) {
public IntKeyLongValueHashMap(String data) {
super();
toObject(data);
}
@Override public String toStorageData() {
StringBuilder data = new StringBuilder();
for (int i = 0; i < this.size(); i++) {
List<Map.Entry<Integer, IntKeyLongValue>> list = new ArrayList<>(this.entrySet());
for (int i = 0; i < list.size(); i++) {
if (i == 0) {
data.append(this.get(i).toStorageData());
data.append(list.get(i).getValue().toStorageData());
} else {
data.append(Const.ARRAY_SPLIT).append(this.get(i).toStorageData());
data.append(Const.ARRAY_SPLIT).append(list.get(i).getValue().toStorageData());
}
}
return data.toString();
......@@ -54,19 +56,19 @@ public class IntKeyLongValueArray extends ArrayList<IntKeyLongValue> implements
@Override public void toObject(String data) {
String[] keyValues = data.split(Const.ARRAY_PARSER_SPLIT);
for (int i = 0; i < keyValues.length; i++) {
for (String keyValue : keyValues) {
IntKeyLongValue value = new IntKeyLongValue();
value.toObject(keyValues[i]);
this.add(value);
value.toObject(keyValue);
this.put(value.getKey(), value);
}
}
@Override public void copyFrom(Object source) {
IntKeyLongValueArray valueArray = (IntKeyLongValueArray)source;
valueArray.forEach(value -> {
IntKeyLongValueHashMap intKeyLongValueHashMap = (IntKeyLongValueHashMap)source;
intKeyLongValueHashMap.values().forEach(value -> {
IntKeyLongValue newValue = new IntKeyLongValue();
newValue.copyFrom(value);
this.add(newValue);
this.put(newValue.getKey(), newValue);
});
}
}
......@@ -29,8 +29,6 @@ import org.joda.time.format.*;
*/
public abstract class Metrics extends StreamData implements StorageData {
private static DateTimeFormatter TIME_BUCKET_MONTH_FORMATTER = DateTimeFormat.forPattern("yyyyMM");
public static final String TIME_BUCKET = "time_bucket";
public static final String ENTITY_ID = "entity_id";
......@@ -92,11 +90,12 @@ public abstract class Metrics extends StreamData implements StorageData {
} else if (isDayBucket()) {
return 24 * 60;
} else {
/**
/*
* In month time bucket status.
* Usually after {@link #toTimeBucketInMonth()} called.
*/
int dayOfMonth = TIME_BUCKET_MONTH_FORMATTER.parseLocalDate(timeBucket + "").getDayOfMonth();
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMM");
int dayOfMonth = formatter.parseLocalDate(timeBucket + "").getDayOfMonth();
return dayOfMonth * 24 * 60;
}
}
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.analysis.metrics;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.*;
import org.apache.skywalking.oap.server.core.query.sql.Function;
......@@ -33,36 +32,34 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
*
* @author wusheng, peng-yongsheng
*/
public abstract class PxxMetrics extends Metrics implements IntValueHolder {
public abstract class PxxMetrics extends GroupMetrics implements IntValueHolder {
protected static final String DETAIL_GROUP = "detail_group";
protected static final String VALUE = "value";
protected static final String PRECISION = "precision";
@Getter @Setter @Column(columnName = VALUE, isValue = true, function = Function.Avg) private int value;
@Getter @Setter @Column(columnName = PRECISION) private int precision;
@Getter @Setter @Column(columnName = DETAIL_GROUP) private IntKeyLongValueArray detailGroup;
@Getter @Setter @Column(columnName = DETAIL_GROUP) private IntKeyLongValueHashMap detailGroup;
private final int percentileRank;
@Getter private Map<Integer, IntKeyLongValue> detailIndex;
private boolean isCalculated;
public PxxMetrics(int percentileRank) {
this.percentileRank = percentileRank;
detailGroup = new IntKeyLongValueArray(30);
detailGroup = new IntKeyLongValueHashMap(30);
}
@Entrance
public final void combine(@SourceFrom int value, @Arg int precision) {
this.isCalculated = false;
this.precision = precision;
this.indexCheckAndInit();
int index = value / precision;
IntKeyLongValue element = detailIndex.get(index);
IntKeyLongValue element = detailGroup.get(index);
if (element == null) {
element = new IntKeyLongValue();
element.setKey(index);
element.setValue(1);
addElement(element);
element = new IntKeyLongValue(index, 1);
detailGroup.put(element.getKey(), element);
} else {
element.addValue(1);
}
......@@ -70,48 +67,27 @@ public abstract class PxxMetrics extends Metrics implements IntValueHolder {
@Override
public void combine(Metrics metrics) {
PxxMetrics pxxMetrics = (PxxMetrics)metrics;
this.indexCheckAndInit();
pxxMetrics.indexCheckAndInit();
this.isCalculated = false;
pxxMetrics.detailIndex.forEach((key, element) -> {
IntKeyLongValue existingElement = this.detailIndex.get(key);
if (existingElement == null) {
existingElement = new IntKeyLongValue();
existingElement.setKey(key);
existingElement.setValue(element.getValue());
addElement(element);
} else {
existingElement.addValue(element.getValue());
}
});
PxxMetrics pxxMetrics = (PxxMetrics)metrics;
combine(pxxMetrics.getDetailGroup(), this.detailGroup);
}
@Override
public final void calculate() {
Collections.sort(detailGroup);
int total = detailGroup.stream().mapToInt(element -> (int)element.getValue()).sum();
int roof = Math.round(total * percentileRank * 1.0f / 100);
int count = 0;
for (IntKeyLongValue element : detailGroup) {
count += element.getValue();
if (count >= roof) {
value = element.getKey() * precision;
return;
}
}
}
if (!isCalculated) {
int total = detailGroup.values().stream().mapToInt(element -> (int)element.getValue()).sum();
int roof = Math.round(total * percentileRank * 1.0f / 100);
private void addElement(IntKeyLongValue element) {
detailGroup.add(element);
detailIndex.put(element.getKey(), element);
}
private void indexCheckAndInit() {
if (detailIndex == null) {
detailIndex = new HashMap<>();
detailGroup.forEach(element -> detailIndex.put(element.getKey(), element));
int count = 0;
for (IntKeyLongValue element : detailGroup.values()) {
count += element.getValue();
if (count >= roof) {
value = element.getKey() * precision;
return;
}
}
}
}
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.core.analysis.metrics;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
......@@ -34,16 +33,15 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
* @author wusheng, peng-yongsheng
*/
@MetricsFunction(functionName = "thermodynamic")
public abstract class ThermodynamicMetrics extends Metrics {
public abstract class ThermodynamicMetrics extends GroupMetrics {
public static final String DETAIL_GROUP = "detail_group";
public static final String STEP = "step";
public static final String NUM_OF_STEPS = "num_of_steps";
@Getter @Setter @Column(columnName = STEP) private int step = 0;
@Getter @Setter @Column(columnName = NUM_OF_STEPS) private int numOfSteps = 0;
@Getter @Setter @Column(columnName = DETAIL_GROUP, isValue = true) private IntKeyLongValueArray detailGroup = new IntKeyLongValueArray(30);
private Map<Integer, IntKeyLongValue> detailIndex;
@Getter @Setter @Column(columnName = DETAIL_GROUP, isValue = true) private IntKeyLongValueHashMap detailGroup = new IntKeyLongValueHashMap(30);
/**
* Data will be grouped in
......@@ -64,18 +62,15 @@ public abstract class ThermodynamicMetrics extends Metrics {
this.numOfSteps = maxNumOfSteps;
}
indexCheckAndInit();
int index = value / step;
if (index > maxNumOfSteps) {
index = numOfSteps;
}
IntKeyLongValue element = detailIndex.get(index);
IntKeyLongValue element = detailGroup.get(index);
if (element == null) {
element = new IntKeyLongValue();
element.setKey(index);
element.setValue(1);
addElement(element);
element = new IntKeyLongValue(index, 1);
detailGroup.put(element.getKey(), element);
} else {
element.addValue(1);
}
......@@ -84,21 +79,7 @@ public abstract class ThermodynamicMetrics extends Metrics {
@Override
public void combine(Metrics metrics) {
ThermodynamicMetrics thermodynamicMetrics = (ThermodynamicMetrics)metrics;
this.indexCheckAndInit();
thermodynamicMetrics.indexCheckAndInit();
final ThermodynamicMetrics self = this;
thermodynamicMetrics.detailIndex.forEach((key, element) -> {
IntKeyLongValue existingElement = self.detailIndex.get(key);
if (existingElement == null) {
existingElement = new IntKeyLongValue();
existingElement.setKey(key);
existingElement.setValue(element.getValue());
self.addElement(element);
} else {
existingElement.addValue(element.getValue());
}
});
combine(thermodynamicMetrics.getDetailGroup(), this.detailGroup);
}
/**
......@@ -106,18 +87,5 @@ public abstract class ThermodynamicMetrics extends Metrics {
*/
@Override
public final void calculate() {
}
private void addElement(IntKeyLongValue element) {
detailGroup.add(element);
detailIndex.put(element.getKey(), element);
}
private void indexCheckAndInit() {
if (detailIndex == null) {
detailIndex = new HashMap<>();
detailGroup.forEach(element -> detailIndex.put(element.getKey(), element));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.core.analysis.metrics;
import org.junit.*;
/**
* @author peng-yongsheng
*/
public class IntKeyLongValueHashMapTestCase {
private IntKeyLongValueHashMap intKeyLongValueHashMap;
@Before
public void init() {
IntKeyLongValue v1 = new IntKeyLongValue(5, 500);
IntKeyLongValue v2 = new IntKeyLongValue(6, 600);
IntKeyLongValue v3 = new IntKeyLongValue(1, 100);
IntKeyLongValue v4 = new IntKeyLongValue(2, 200);
IntKeyLongValue v5 = new IntKeyLongValue(7, 700);
intKeyLongValueHashMap = new IntKeyLongValueHashMap();
intKeyLongValueHashMap.put(v1.getKey(), v1);
intKeyLongValueHashMap.put(v2.getKey(), v2);
intKeyLongValueHashMap.put(v3.getKey(), v3);
intKeyLongValueHashMap.put(v4.getKey(), v4);
intKeyLongValueHashMap.put(v5.getKey(), v5);
}
@Test
public void toStorageData() {
Assert.assertEquals("1,100|2,200|5,500|6,600|7,700", intKeyLongValueHashMap.toStorageData());
}
@Test
public void toObject() {
IntKeyLongValueHashMap intKeyLongValueHashMap = new IntKeyLongValueHashMap();
intKeyLongValueHashMap.toObject("1,100|2,200|5,500|6,600|7,700");
Assert.assertEquals(100, intKeyLongValueHashMap.get(1).getValue());
Assert.assertEquals(200, intKeyLongValueHashMap.get(2).getValue());
Assert.assertEquals(500, intKeyLongValueHashMap.get(5).getValue());
Assert.assertEquals(600, intKeyLongValueHashMap.get(6).getValue());
Assert.assertEquals(700, intKeyLongValueHashMap.get(7).getValue());
}
@Test
public void copyFrom() {
IntKeyLongValueHashMap intKeyLongValueHashMap = new IntKeyLongValueHashMap();
intKeyLongValueHashMap.copyFrom(this.intKeyLongValueHashMap);
Assert.assertEquals("1,100|2,200|5,500|6,600|7,700", intKeyLongValueHashMap.toStorageData());
}
}
......@@ -20,9 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.metrics;
import java.util.Map;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import org.junit.*;
/**
* @author wusheng
......@@ -49,7 +47,7 @@ public class ThermodynamicMetricsTest {
metricsMocker.combine(100, step, maxNumOfSteps);
metricsMocker.combine(100, step, maxNumOfSteps);
Map<Integer, IntKeyLongValue> index = Whitebox.getInternalState(metricsMocker, "detailIndex");
Map<Integer, IntKeyLongValue> index = metricsMocker.getDetailGroup();
Assert.assertEquals(4, index.size());
Assert.assertEquals(1, index.get(2).getValue());
......@@ -81,7 +79,7 @@ public class ThermodynamicMetricsTest {
metricsMocker.combine(metricsMocker1);
Map<Integer, IntKeyLongValue> index = Whitebox.getInternalState(metricsMocker, "detailIndex");
Map<Integer, IntKeyLongValue> index = metricsMocker.getDetailGroup();
Assert.assertEquals(4, index.size());
Assert.assertEquals(1, index.get(2).getValue());
......
......@@ -18,34 +18,21 @@
package org.apache.skywalking.oap.server.core.remote;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.inprocess.*;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.DuplicateProviderException;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.module.ProviderNotFoundException;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.*;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import static org.mockito.Mockito.*;
/**
* @author peng-yongsheng
......@@ -57,13 +44,19 @@ public class RemoteServiceHandlerTestCase {
@Test
public void callTest() throws DuplicateProviderException, ProviderNotFoundException, IOException {
final int streamDataClassId = 1;
final String testWorkerId = "mock-worker";
ModuleManagerTesting moduleManager = new ModuleManagerTesting();
ModuleDefineTesting moduleDefine = new ModuleDefineTesting();
moduleManager.put(CoreModule.NAME, moduleDefine);
WorkerInstancesService workerInstancesService = new WorkerInstancesService();
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceGetter.class, workerInstancesService);
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
TestWorker worker = new TestWorker(moduleManager);
workerInstancesService.put(testWorkerId, worker, TestRemoteData.class);
String serverName = InProcessServerBuilder.generateName();
MetricsCreator metricsCreator = mock(MetricsCreator.class);
when(metricsCreator.createCounter(any(), any(), any(), any())).thenReturn(new CounterMetrics() {
......@@ -75,13 +68,16 @@ public class RemoteServiceHandlerTestCase {
}
});
when(metricsCreator.createHistogramMetric(any(), any(), any(), any(), any())).thenReturn(
new HistogramMetrics() {
@Override public void observe(double value) {
when(metricsCreator.createHistogramMetric(any(), any(), any(), any())).thenReturn(new HistogramMetrics() {
@Override public Timer createTimer() {
return super.createTimer();
}
@Override public void observe(double value) {
}
}
);
});
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
......
......@@ -24,33 +24,21 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.core.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.testing.module.ModuleDefineTesting;
import org.apache.skywalking.oap.server.testing.module.ModuleManagerTesting;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.apache.skywalking.oap.server.testing.module.*;
import org.junit.*;
import static org.mockito.Mockito.*;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClientTestCase {
private final String nextWorkerId = "mock-worker";
private final String nextWorkerName = "mock-worker";
private ModuleManagerTesting moduleManager;
@Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
......@@ -65,6 +53,7 @@ public class GRPCRemoteClientTestCase {
moduleDefine.provider().registerServiceImplementation(IWorkerInstanceSetter.class, workerInstancesService);
TestWorker worker = new TestWorker(moduleManager);
workerInstancesService.put(nextWorkerName, worker, TestStreamData.class);
}
@Test
......@@ -79,6 +68,17 @@ public class GRPCRemoteClientTestCase {
}
});
when(metricsCreator.createHistogramMetric(any(), any(), any(), any())).thenReturn(new HistogramMetrics() {
@Override public Timer createTimer() {
return super.createTimer();
}
@Override public void observe(double value) {
}
});
ModuleDefineTesting telemetryModuleDefine = new ModuleDefineTesting();
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
......@@ -92,7 +92,7 @@ public class GRPCRemoteClientTestCase {
doReturn(grpcServerRule.getChannel()).when(remoteClient).getChannel();
for (int i = 0; i < 12; i++) {
remoteClient.push(nextWorkerId, new TestStreamData());
remoteClient.push(nextWorkerName, new TestStreamData());
}
TimeUnit.SECONDS.sleep(2);
......
......@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueArray;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.storage.model.DataTypeMapping;
/**
......@@ -35,7 +35,7 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "double";
} else if (String.class.equals(type)) {
return "keyword";
} else if (IntKeyLongValueArray.class.equals(type)) {
} else if (IntKeyLongValueHashMap.class.equals(type)) {
return "keyword";
} else if (byte[].class.equals(type)) {
return "binary";
......
......@@ -142,7 +142,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
numOfSteps = ((Number)source.get(ThermodynamicMetrics.NUM_OF_STEPS)).intValue() + 1;
String value = (String)source.get(ThermodynamicMetrics.DETAIL_GROUP);
IntKeyLongValueArray intKeyLongValues = new IntKeyLongValueArray(5);
IntKeyLongValueHashMap intKeyLongValues = new IntKeyLongValueHashMap(5);
intKeyLongValues.toObject(value);
List<Long> axisYValues = new ArrayList<>();
......@@ -150,7 +150,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
axisYValues.add(0L);
}
for (IntKeyLongValue intKeyLongValue : intKeyLongValues) {
for (IntKeyLongValue intKeyLongValue : intKeyLongValues.values()) {
axisYValues.set(intKeyLongValue.getKey(), intKeyLongValue.getValue());
}
......
......@@ -178,7 +178,7 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
String id = resultSet.getString("id");
numOfSteps = resultSet.getInt("num_of_steps") + 1;
String value = resultSet.getString("detail_group");
IntKeyLongValueArray intKeyLongValues = new IntKeyLongValueArray(5);
IntKeyLongValueHashMap intKeyLongValues = new IntKeyLongValueHashMap(5);
intKeyLongValues.toObject(value);
List<Long> axisYValues = new ArrayList<>();
......@@ -186,7 +186,7 @@ public class H2MetricsQueryDAO extends H2SQLExecutor implements IMetricsQueryDAO
axisYValues.add(0L);
}
for (IntKeyLongValue intKeyLongValue : intKeyLongValues) {
for (IntKeyLongValue intKeyLongValue : intKeyLongValues.values()) {
axisYValues.set(intKeyLongValue.getKey(), intKeyLongValue.getValue());
}
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueArray;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
......@@ -87,7 +87,7 @@ public class H2TableInstaller extends ModelInstaller {
return "DOUBLE";
} else if (String.class.equals(type)) {
return "VARCHAR(2000)";
} else if (IntKeyLongValueArray.class.equals(type)) {
} else if (IntKeyLongValueHashMap.class.equals(type)) {
return "VARCHAR(20000)";
} else if (byte[].class.equals(type)) {
return "VARCHAR(20000)";
......
......@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.sql.*;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueArray;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntKeyLongValueHashMap;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.StorageException;
......@@ -73,7 +73,7 @@ public class MySQLTableInstaller extends H2TableInstaller {
return "VARCHAR(300)";
}
return "VARCHAR(2000)";
} else if (IntKeyLongValueArray.class.equals(type)) {
} else if (IntKeyLongValueHashMap.class.equals(type)) {
return "MEDIUMTEXT";
} else if (byte[].class.equals(type)) {
return "MEDIUMTEXT";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册