提交 3b9d1b33 编写于 作者: P peng-yongsheng

Refactor Data.

上级 e11e4e5a
......@@ -129,5 +129,12 @@
<version>${project.version}</version>
</dependency>
<!-- queue provider -->
<!-- stream provider -->
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-provider</artifactId>
<version>${project.version}</version>
</dependency>
<!-- stream provider -->
</dependencies>
</project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.data;
/**
* @author peng-yongsheng
*/
public class Column {
private final String name;
private final Operation operation;
public Column(String name, Operation operation) {
this.name = name;
this.operation = operation;
}
public String getName() {
return name;
}
public Operation getOperation() {
return operation;
}
}
......@@ -23,7 +23,7 @@ package org.skywalking.apm.collector.core.data;
*/
public class CommonTable {
public static final String TABLE_TYPE = "type";
public static final String COLUMN_ID = "id";
public static final String COLUMN_ID = "getId";
public static final String COLUMN_AGG = "agg";
public static final String COLUMN_TIME_BUCKET = "time_bucket";
}
......@@ -21,7 +21,7 @@ package org.skywalking.apm.collector.core.data;
/**
* @author peng-yongsheng
*/
public class Data extends AbstractHashMessage {
public abstract class Data extends AbstractHashMessage {
private String[] dataStrings;
private Long[] dataLongs;
private Double[] dataDoubles;
......@@ -29,91 +29,91 @@ public class Data extends AbstractHashMessage {
private Boolean[] dataBooleans;
private byte[][] dataBytes;
public Data(String id, int stringCapacity, int longCapacity, int doubleCapacity,
int integerCapacity, int booleanCapacity, int byteCapacity) {
public Data(String id, Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
Column[] integerColumns, Column[] booleanColumns, Column[] byteColumns) {
super(id);
this.dataStrings = new String[stringCapacity];
this.dataStrings[0] = id;
this.dataLongs = new Long[longCapacity];
this.dataDoubles = new Double[doubleCapacity];
this.dataIntegers = new Integer[integerCapacity];
this.dataBooleans = new Boolean[booleanCapacity];
this.dataBytes = new byte[byteCapacity][];
this.dataStrings = new String[stringColumns.length];
this.dataLongs = new Long[longColumns.length];
this.dataDoubles = new Double[doubleColumns.length];
this.dataIntegers = new Integer[integerColumns.length];
this.dataBooleans = new Boolean[booleanColumns.length];
this.dataBytes = new byte[byteColumns.length][];
}
public void setDataString(int position, String value) {
protected void setDataString(int position, String value) {
dataStrings[position] = value;
}
public void setDataLong(int position, Long value) {
protected void setDataLong(int position, Long value) {
dataLongs[position] = value;
}
public void setDataDouble(int position, Double value) {
protected void setDataDouble(int position, Double value) {
dataDoubles[position] = value;
}
public void setDataInteger(int position, Integer value) {
protected void setDataInteger(int position, Integer value) {
dataIntegers[position] = value;
}
public void setDataBoolean(int position, Boolean value) {
protected void setDataBoolean(int position, Boolean value) {
dataBooleans[position] = value;
}
public void setDataBytes(int position, byte[] dataBytes) {
protected void setDataBytes(int position, byte[] dataBytes) {
this.dataBytes[position] = dataBytes;
}
public String getDataString(int position) {
protected String getDataString(int position) {
return dataStrings[position];
}
public Long getDataLong(int position) {
protected Long getDataLong(int position) {
return dataLongs[position];
}
public Double getDataDouble(int position) {
protected Double getDataDouble(int position) {
return dataDoubles[position];
}
public Integer getDataInteger(int position) {
protected Integer getDataInteger(int position) {
return dataIntegers[position];
}
public Boolean getDataBoolean(int position) {
protected Boolean getDataBoolean(int position) {
return dataBooleans[position];
}
public byte[] getDataBytes(int position) {
protected byte[] getDataBytes(int position) {
return dataBytes[position];
}
public String id() {
public String getId() {
return dataStrings[0];
}
@Override public String toString() {
StringBuilder dataStr = new StringBuilder();
dataStr.append("string: [");
for (int i = 0; i < dataStrings.length; i++) {
dataStr.append(dataStrings[i]).append(",");
for (String dataString : dataStrings) {
dataStr.append(dataString).append(",");
}
dataStr.append("], longs: [");
for (int i = 0; i < dataLongs.length; i++) {
dataStr.append(dataLongs[i]).append(",");
for (Long dataLong : dataLongs) {
dataStr.append(dataLong).append(",");
}
dataStr.append("], double: [");
for (int i = 0; i < dataDoubles.length; i++) {
dataStr.append(dataDoubles[i]).append(",");
for (Double dataDouble : dataDoubles) {
dataStr.append(dataDouble).append(",");
}
dataStr.append("], integer: [");
for (int i = 0; i < dataIntegers.length; i++) {
dataStr.append(dataIntegers[i]).append(",");
for (Integer dataInteger : dataIntegers) {
dataStr.append(dataInteger).append(",");
}
dataStr.append("], boolean: [");
for (int i = 0; i < dataBooleans.length; i++) {
dataStr.append(dataBooleans[i]).append(",");
for (Boolean dataBoolean : dataBooleans) {
dataStr.append(dataBoolean).append(",");
}
return dataStr.toString();
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.data;
/**
* @author peng-yongsheng
*/
public abstract class DataDefine {
private Attribute[] attributes;
private int stringCapacity;
private int longCapacity;
private int doubleCapacity;
private int integerCapacity;
private int booleanCapacity;
private int byteCapacity;
public DataDefine() {
initial();
}
private void initial() {
attributes = new Attribute[initialCapacity()];
attributeDefine();
for (Attribute attribute : attributes) {
if (AttributeType.STRING.equals(attribute.getType())) {
stringCapacity++;
} else if (AttributeType.LONG.equals(attribute.getType())) {
longCapacity++;
} else if (AttributeType.DOUBLE.equals(attribute.getType())) {
doubleCapacity++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
integerCapacity++;
} else if (AttributeType.BOOLEAN.equals(attribute.getType())) {
booleanCapacity++;
} else if (AttributeType.BYTE.equals(attribute.getType())) {
byteCapacity++;
}
}
}
public final void addAttribute(int position, Attribute attribute) {
attributes[position] = attribute;
}
public abstract int remoteDataMappingId();
protected abstract int initialCapacity();
protected abstract void attributeDefine();
public final Data build(String id) {
return new Data(id, stringCapacity, longCapacity, doubleCapacity, integerCapacity, booleanCapacity, byteCapacity);
}
public final void mergeData(Data newData, Data oldData) {
int stringPosition = 0;
int longPosition = 0;
int doublePosition = 0;
int integerPosition = 0;
int booleanPosition = 0;
int bytePosition = 0;
for (int i = 0; i < initialCapacity(); i++) {
Attribute attribute = attributes[i];
if (AttributeType.STRING.equals(attribute.getType())) {
String stringData = attribute.getOperation().operate(newData.getDataString(stringPosition), oldData.getDataString(stringPosition));
newData.setDataString(stringPosition, stringData);
stringPosition++;
} else if (AttributeType.LONG.equals(attribute.getType())) {
Long longData = attribute.getOperation().operate(newData.getDataLong(longPosition), oldData.getDataLong(longPosition));
newData.setDataLong(longPosition, longData);
longPosition++;
} else if (AttributeType.DOUBLE.equals(attribute.getType())) {
Double doubleData = attribute.getOperation().operate(newData.getDataDouble(doublePosition), oldData.getDataDouble(doublePosition));
newData.setDataDouble(doublePosition, doubleData);
doublePosition++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
Integer integerData = attribute.getOperation().operate(newData.getDataInteger(integerPosition), oldData.getDataInteger(integerPosition));
newData.setDataInteger(integerPosition, integerData);
integerPosition++;
} else if (AttributeType.BOOLEAN.equals(attribute.getType())) {
Boolean booleanData = attribute.getOperation().operate(newData.getDataBoolean(booleanPosition), oldData.getDataBoolean(booleanPosition));
newData.setDataBoolean(booleanPosition, booleanData);
booleanPosition++;
} else if (AttributeType.BYTE.equals(attribute.getType())) {
byte[] byteData = attribute.getOperation().operate(newData.getDataBytes(bytePosition), oldData.getDataBytes(integerPosition));
newData.setDataBytes(bytePosition, byteData);
bytePosition++;
}
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.core.framework;
/**
* @author peng-yongsheng
*/
public interface Executor<Input> {
void execute(Input input);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.service;
/**
* @author peng-yongsheng
*/
public interface InstPerformanceRemoteService<RemoteData, Builder> extends SerializableAndDeserialize<RemoteData, Builder> {
}
......@@ -19,16 +19,13 @@
package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public interface SerializableAndDeserialize<T, B> {
public interface SerializableAndDeserialize<RemoteData, Builder> {
RemoteDataMapping mapping();
void deserialize(RemoteData remoteData, Data data);
Data deserialize(T remoteData);
B serialize(Data data);
Builder serialize(Data data);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.grpc.service;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.service.InstPerformanceRemoteService;
/**
* @author peng-yongsheng
*/
public class InstPerformanceGRPCRemoteService implements InstPerformanceRemoteService<RemoteData, RemoteData.Builder> {
@Override public void deserialize(RemoteData remoteData, Data data) {
}
@Override public RemoteData.Builder serialize(Data data) {
return null;
}
}
......@@ -18,16 +18,13 @@
package org.skywalking.apm.collector.storage.base.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.DataDefine;
/**
* @author peng-yongsheng
*/
public interface IPersistenceDAO<I, U> {
Data get(String id, DataDefine dataDefine);
public interface IPersistenceDAO<Insert, Update, Data> {
Data get(String id);
I prepareBatchInsert(Data data);
Insert prepareBatchInsert(Data data);
U prepareBatchUpdate(Data data);
Update prepareBatchUpdate(Data data);
}
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.table.register.Application;
/**
* @author peng-yongsheng
......@@ -28,5 +28,5 @@ public interface IApplicationDAO {
int getMinApplicationId();
void save(Data data);
void save(Application application);
}
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.table.register.Instance;
/**
* @author peng-yongsheng
......@@ -30,7 +30,7 @@ public interface IInstanceDAO {
int getMinInstanceId();
void save(Data data);
void save(Instance instance);
void updateHeartbeatTime(int instanceId, long heartbeatTime);
}
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
......@@ -28,5 +28,5 @@ public interface IServiceNameDAO {
int getMinServiceId();
void save(Data data);
void save(ServiceName serviceName);
}
......@@ -25,5 +25,5 @@ import org.skywalking.apm.collector.storage.base.dao.DAO;
* @author peng-yongsheng
*/
public interface DAOService extends Service {
DAO get(Class<DAO> daoInterfaceClass);
DAO get(Class daoInterfaceClass);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.global;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class GlobalTrace extends Data {
private static final Column[] STRING_COLUMNS = {
new Column(GlobalTraceTable.COLUMN_ID, new NonOperation()),
new Column(GlobalTraceTable.COLUMN_SEGMENT_ID, new CoverOperation()),
new Column(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, new CoverOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(GlobalTraceTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public GlobalTrace(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getSegmentId() {
return getDataString(1);
}
public String getGlobalTraceId() {
return getDataString(2);
}
public Long getTimeBucket() {
return getDataLong(0);
}
}
......@@ -18,33 +18,78 @@
package org.skywalking.apm.collector.storage.table.instance;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class InstPerformanceDataDefine extends DataDefine {
public class InstPerformance extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.InstPerformance.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(InstPerformanceTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(InstPerformanceTable.COLUMN_COST_TOTAL, new AddOperation()),
new Column(InstPerformanceTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(InstPerformanceTable.COLUMN_APPLICATION_ID, new CoverOperation()),
new Column(InstPerformanceTable.COLUMN_INSTANCE_ID, new CoverOperation()),
new Column(InstPerformanceTable.COLUMN_CALLS, new AddOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public InstPerformance(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public Long getCostTotal() {
return getDataLong(0);
}
public void setCostTotal(Long costTotal) {
setDataLong(0, costTotal);
}
public Long getTimeBucket() {
return getDataLong(1);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(1, timeBucket);
}
public Integer getApplicationId() {
return getDataInteger(0);
}
public void setApplicationId(Integer applicationId) {
setDataInteger(0, applicationId);
}
public Integer getInstanceId() {
return getDataInteger(1);
}
public void setInstanceId(Integer instanceId) {
setDataInteger(1, instanceId);
}
@Override protected int initialCapacity() {
return 6;
public Integer getCalls() {
return getDataInteger(2);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(InstPerformanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(InstPerformanceTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(InstPerformanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(InstPerformanceTable.COLUMN_CALLS, AttributeType.INTEGER, new AddOperation()));
addAttribute(4, new Attribute(InstPerformanceTable.COLUMN_COST_TOTAL, AttributeType.LONG, new AddOperation()));
addAttribute(5, new Attribute(InstPerformanceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
public void setCalls(Integer calls) {
setDataInteger(2, calls);
}
}
......@@ -18,31 +18,49 @@
package org.skywalking.apm.collector.storage.table.jvm;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class CpuMetricDataDefine extends DataDefine {
public class CpuMetric extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.CpuMetric.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(CpuMetricTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(CpuMetricTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {
new Column(CpuMetricTable.COLUMN_USAGE_PERCENT, new AddOperation()),
};
private static final Column[] INTEGER_COLUMNS = {
new Column(CpuMetricTable.COLUMN_INSTANCE_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public CpuMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public Integer getInstanceId() {
return getDataInteger(0);
}
@Override protected int initialCapacity() {
return 4;
public Double getUsagePercent() {
return getDataDouble(0);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(CpuMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(CpuMetricTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(CpuMetricTable.COLUMN_USAGE_PERCENT, AttributeType.DOUBLE, new AddOperation()));
addAttribute(3, new Attribute(CpuMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
public Long getTimeBucket() {
return getDataLong(0);
}
}
......@@ -18,32 +18,58 @@
package org.skywalking.apm.collector.storage.table.jvm;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class GCMetricDataDefine extends DataDefine {
public class GCMetric extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.GCMetric.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(GCMetricTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(GCMetricTable.COLUMN_COUNT, new CoverOperation()),
new Column(GCMetricTable.COLUMN_TIME, new CoverOperation()),
new Column(GCMetricTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {
};
private static final Column[] INTEGER_COLUMNS = {
new Column(GCMetricTable.COLUMN_INSTANCE_ID, new CoverOperation()),
new Column(GCMetricTable.COLUMN_PHRASE, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public GCMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public Long getCount() {
return getDataLong(0);
}
public Long getTime() {
return getDataLong(1);
}
public Long getTimeBucket() {
return getDataLong(2);
}
@Override protected int initialCapacity() {
return 6;
public Integer getInstanceId() {
return getDataInteger(0);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(GCMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(GCMetricTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(GCMetricTable.COLUMN_PHRASE, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(GCMetricTable.COLUMN_COUNT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(GCMetricTable.COLUMN_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(GCMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
public Integer getPhrase() {
return getDataInteger(1);
}
}
......@@ -16,53 +16,71 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.register;
package org.skywalking.apm.collector.storage.table.jvm;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ServiceNameDataDefine extends DataDefine {
public class MemoryMetric extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.ServiceName.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(MemoryMetricTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(MemoryMetricTable.COLUMN_INIT, new CoverOperation()),
new Column(MemoryMetricTable.COLUMN_MAX, new CoverOperation()),
new Column(MemoryMetricTable.COLUMN_USED, new CoverOperation()),
new Column(MemoryMetricTable.COLUMN_COMMITTED, new CoverOperation()),
new Column(MemoryMetricTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {
};
private static final Column[] INTEGER_COLUMNS = {
new Column(MemoryMetricTable.COLUMN_INSTANCE_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {
new Column(MemoryMetricTable.COLUMN_IS_HEAP, new CoverOperation()),
};
private static final Column[] BYTE_COLUMNS = {};
public MemoryMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
@Override protected int initialCapacity() {
return 4;
public Long getInit() {
return getDataLong(0);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceNameTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceNameTable.COLUMN_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ServiceNameTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(ServiceNameTable.COLUMN_SERVICE_ID, AttributeType.INTEGER, new CoverOperation()));
public Long getMax() {
return getDataLong(1);
}
public enum ServiceName {
INSTANCE;
public Long getUsed() {
return getDataLong(2);
}
public String getId(Data data) {
return data.getDataString(0);
}
public Long getCommitted() {
return getDataLong(3);
}
public String getServiceName(Data data) {
return data.getDataString(1);
}
public Long getTimeBucket() {
return getDataLong(4);
}
public int getApplicationId(Data data) {
return data.getDataInteger(0);
}
public Boolean getIsHeap() {
return getDataBoolean(0);
}
public int getServiceId(Data data) {
return data.getDataInteger(1);
}
public Integer getInstanceId() {
return getDataInteger(0);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.jvm;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class MemoryMetricDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.MemoryMetric.ordinal();
}
@Override protected int initialCapacity() {
return 8;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(MemoryMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(MemoryMetricTable.COLUMN_IS_HEAP, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(3, new Attribute(MemoryMetricTable.COLUMN_INIT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(MemoryMetricTable.COLUMN_MAX, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(MemoryMetricTable.COLUMN_USED, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(MemoryMetricTable.COLUMN_COMMITTED, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(MemoryMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
}
......@@ -25,7 +25,7 @@ import org.skywalking.apm.collector.core.data.CommonTable;
*/
public class MemoryMetricTable extends CommonTable {
public static final String TABLE = "memory_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_INSTANCE_ID = "instance_id";
public static final String COLUMN_IS_HEAP = "is_heap";
public static final String COLUMN_INIT = "init";
public static final String COLUMN_MAX = "max";
......
......@@ -18,34 +18,68 @@
package org.skywalking.apm.collector.storage.table.jvm;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class MemoryPoolMetricDataDefine extends DataDefine {
public class MemoryPoolMetric extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.MemoryPoolMetric.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(MemoryPoolMetricTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(MemoryPoolMetricTable.COLUMN_INIT, new CoverOperation()),
new Column(MemoryPoolMetricTable.COLUMN_MAX, new CoverOperation()),
new Column(MemoryPoolMetricTable.COLUMN_USED, new CoverOperation()),
new Column(MemoryPoolMetricTable.COLUMN_COMMITTED, new CoverOperation()),
new Column(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {
};
private static final Column[] INTEGER_COLUMNS = {
new Column(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, new CoverOperation()),
new Column(MemoryPoolMetricTable.COLUMN_POOL_TYPE, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public MemoryPoolMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public Long getInit() {
return getDataLong(0);
}
public Long getMax() {
return getDataLong(1);
}
public Long getUsed() {
return getDataLong(2);
}
public Long getCommitted() {
return getDataLong(3);
}
public Long getTimeBucket() {
return getDataLong(4);
}
@Override protected int initialCapacity() {
return 8;
public Integer getInstanceId() {
return getDataInteger(0);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(MemoryPoolMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(MemoryPoolMetricTable.COLUMN_POOL_TYPE, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(MemoryPoolMetricTable.COLUMN_INIT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(MemoryPoolMetricTable.COLUMN_MAX, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(MemoryPoolMetricTable.COLUMN_USED, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(MemoryPoolMetricTable.COLUMN_COMMITTED, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
public Integer getPoolType() {
return getDataInteger(1);
}
}
......@@ -18,32 +18,75 @@
package org.skywalking.apm.collector.storage.table.node;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class NodeComponentDataDefine extends DataDefine {
public class NodeComponent extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.NodeComponent.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(NodeComponentTable.COLUMN_ID, new NonOperation()),
new Column(NodeComponentTable.COLUMN_COMPONENT_NAME, new CoverOperation()),
new Column(NodeComponentTable.COLUMN_PEER, new CoverOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(NodeComponentTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(NodeComponentTable.COLUMN_COMPONENT_ID, new CoverOperation()),
new Column(NodeComponentTable.COLUMN_PEER_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public NodeComponent(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getComponentName() {
return getDataString(1);
}
public void setComponentName(String componentName) {
setDataString(1, componentName);
}
public String getPeer() {
return getDataString(1);
}
public void setPeer(String peer) {
setDataString(2, peer);
}
public Long getTimeBucket() {
return getDataLong(0);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(0, timeBucket);
}
public Integer getComponentId() {
return getDataInteger(0);
}
public void setComponentId(Integer componentId) {
setDataInteger(0, componentId);
}
@Override protected int initialCapacity() {
return 6;
public Integer getPeerId() {
return getDataInteger(1);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeComponentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeComponentTable.COLUMN_COMPONENT_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(NodeComponentTable.COLUMN_COMPONENT_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(NodeComponentTable.COLUMN_PEER_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(4, new Attribute(NodeComponentTable.COLUMN_PEER, AttributeType.STRING, new CoverOperation()));
addAttribute(5, new Attribute(NodeComponentTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
public void setPeerId(Integer peerId) {
setDataInteger(1, peerId);
}
}
......@@ -18,31 +18,34 @@
package org.skywalking.apm.collector.storage.table.node;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class NodeMappingDataDefine extends DataDefine {
public class NodeMapping extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.NodeMapping.ordinal();
}
private static final Column[] STRING_COLUMNS = {
new Column(NodeMappingTable.COLUMN_ID, new NonOperation()),
new Column(NodeMappingTable.COLUMN_ADDRESS, new CoverOperation()),
};
@Override protected int initialCapacity() {
return 5;
}
private static final Column[] LONG_COLUMNS = {
new Column(NodeMappingTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(NodeMappingTable.COLUMN_APPLICATION_ID, new CoverOperation()),
new Column(NodeMappingTable.COLUMN_ADDRESS_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeMappingTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeMappingTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(NodeMappingTable.COLUMN_ADDRESS_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(NodeMappingTable.COLUMN_ADDRESS, AttributeType.STRING, new CoverOperation()));
addAttribute(4, new Attribute(NodeMappingTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
public NodeMapping(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.noderef;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class NodeReference extends Data {
private static final Column[] STRING_COLUMNS = {
new Column(NodeReferenceTable.COLUMN_ID, new NonOperation()),
new Column(NodeReferenceTable.COLUMN_BEHIND_PEER, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(NodeReferenceTable.COLUMN_TIME_BUCKET, new NonOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, new NonOperation()),
new Column(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, new NonOperation()),
new Column(NodeReferenceTable.COLUMN_S1_LTE, new AddOperation()),
new Column(NodeReferenceTable.COLUMN_S3_LTE, new AddOperation()),
new Column(NodeReferenceTable.COLUMN_S5_LTE, new AddOperation()),
new Column(NodeReferenceTable.COLUMN_S5_GT, new AddOperation()),
new Column(NodeReferenceTable.COLUMN_SUMMARY, new AddOperation()),
new Column(NodeReferenceTable.COLUMN_ERROR, new AddOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public NodeReference(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getBehindPeer() {
return getDataString(1);
}
public void setBehindPeer(String behindPeer) {
setDataString(1, behindPeer);
}
public Long getTimeBucket() {
return getDataLong(0);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(0, timeBucket);
}
public Integer getFrontApplicationId() {
return getDataInteger(0);
}
public void setFrontApplicationId(Integer frontApplicationId) {
setDataInteger(0, frontApplicationId);
}
public Integer getBehindApplicationId() {
return getDataInteger(1);
}
public void setBehindApplicationId(Integer behindApplicationId) {
setDataInteger(1, behindApplicationId);
}
public Integer getS1Lte() {
return getDataInteger(2);
}
public void setS1Lte(Integer s1Lte) {
setDataInteger(2, s1Lte);
}
public Integer getS3Lte() {
return getDataInteger(3);
}
public void setS3Lte(Integer s3Lte) {
setDataInteger(3, s3Lte);
}
public Integer getS5Lte() {
return getDataInteger(4);
}
public void setS5Lte(Integer s5Lte) {
setDataInteger(4, s5Lte);
}
public Integer getS5Gt() {
return getDataInteger(5);
}
public void setS5Gt(Integer s5Gt) {
setDataInteger(5, s5Gt);
}
public Integer getSummary() {
return getDataInteger(6);
}
public void setSummary(Integer summary) {
setDataInteger(6, summary);
}
public Integer getError() {
return getDataInteger(7);
}
public void setError(Integer error) {
setDataInteger(7, error);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.noderef;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class NodeReferenceDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.NodeReference.ordinal();
}
@Override protected int initialCapacity() {
return 11;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(NodeReferenceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(3, new Attribute(NodeReferenceTable.COLUMN_BEHIND_PEER, AttributeType.STRING, new NonOperation()));
addAttribute(4, new Attribute(NodeReferenceTable.COLUMN_S1_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(5, new Attribute(NodeReferenceTable.COLUMN_S3_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(6, new Attribute(NodeReferenceTable.COLUMN_S5_LTE, AttributeType.INTEGER, new AddOperation()));
addAttribute(7, new Attribute(NodeReferenceTable.COLUMN_S5_GT, AttributeType.INTEGER, new AddOperation()));
addAttribute(8, new Attribute(NodeReferenceTable.COLUMN_SUMMARY, AttributeType.INTEGER, new AddOperation()));
addAttribute(9, new Attribute(NodeReferenceTable.COLUMN_ERROR, AttributeType.INTEGER, new AddOperation()));
addAttribute(10, new Attribute(NodeReferenceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new NonOperation()));
}
}
......@@ -16,32 +16,41 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.global;
package org.skywalking.apm.collector.storage.table.register;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class GlobalTraceDataDefine extends DataDefine {
public class Application extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.GlobalTrace.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(ApplicationTable.COLUMN_ID, new NonOperation()),
new Column(ApplicationTable.COLUMN_APPLICATION_CODE, new CoverOperation()),
};
private static final Column[] LONG_COLUMNS = {};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(ApplicationTable.COLUMN_APPLICATION_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public Application(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
@Override protected int initialCapacity() {
return 4;
public String getApplicationCode() {
return getDataString(1);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(GlobalTraceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(GlobalTraceTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(GlobalTraceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
public int getApplicationId() {
return getDataInteger(0);
}
}
......@@ -18,66 +18,88 @@
package org.skywalking.apm.collector.storage.table.register;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class InstanceDataDefine extends DataDefine {
public class Instance extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.Instance.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(InstanceTable.COLUMN_ID, new NonOperation()),
new Column(InstanceTable.COLUMN_AGENT_UUID, new CoverOperation()),
new Column(InstanceTable.COLUMN_OS_INFO, new CoverOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(InstanceTable.COLUMN_REGISTER_TIME, new CoverOperation()),
new Column(InstanceTable.COLUMN_HEARTBEAT_TIME, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(InstanceTable.COLUMN_APPLICATION_ID, new CoverOperation()),
new Column(InstanceTable.COLUMN_INSTANCE_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public Instance(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getId() {
return getDataString(0);
}
public int getApplicationId() {
return getDataInteger(0);
}
public void setApplicationId(Integer applicationId) {
setDataInteger(0, applicationId);
}
@Override protected int initialCapacity() {
return 7;
public String getAgentUUID() {
return getDataString(1);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_AGENT_UUID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(InstanceTable.COLUMN_REGISTER_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(5, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(InstanceTable.COLUMN_OS_INFO, AttributeType.STRING, new CoverOperation()));
public void setAgentUUID(String agentUUID) {
setDataString(1, agentUUID);
}
public enum Instance {
INSTANCE;
public long getRegisterTime() {
return getDataLong(0);
}
public String getId(Data data) {
return data.getDataString(0);
}
public void setRegisterTime(Long registerTime) {
setDataLong(0, registerTime);
}
public int getApplicationId(Data data) {
return data.getDataInteger(0);
}
public int getInstanceId() {
return getDataInteger(1);
}
public String getAgentUUID(Data data) {
return data.getDataString(1);
}
public void setInstanceId(Integer instanceId) {
setDataInteger(1, instanceId);
}
public long getRegisterTime(Data data) {
return data.getDataLong(0);
}
public long getHeartBeatTime() {
return getDataLong(1);
}
public int getInstanceId(Data data) {
return data.getDataInteger(1);
}
public void setHeartBeatTime(Long heartBeatTime) {
setDataLong(1, heartBeatTime);
}
public long getHeartBeatTime(Data data) {
return data.getDataLong(1);
}
public String getOsInfo() {
return getDataString(2);
}
public String getOsInfo(Data data) {
return data.getDataString(2);
}
public void setOsInfo(String osInfo) {
setDataString(2, osInfo);
}
}
......@@ -18,46 +18,44 @@
package org.skywalking.apm.collector.storage.table.register;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ApplicationDataDefine extends DataDefine {
public class ServiceName extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.Application.ordinal();
}
private static final Column[] STRING_COLUMNS = {
new Column(ServiceNameTable.COLUMN_ID, new NonOperation()),
new Column(ServiceNameTable.COLUMN_SERVICE_NAME, new CoverOperation()),
};
@Override protected int initialCapacity() {
return 3;
}
private static final Column[] LONG_COLUMNS = {};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(ServiceNameTable.COLUMN_APPLICATION_ID, new CoverOperation()),
new Column(ServiceNameTable.COLUMN_SERVICE_ID, new CoverOperation()),
};
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ApplicationTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ApplicationTable.COLUMN_APPLICATION_CODE, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ApplicationTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
}
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public enum Application {
INSTANCE;
public ServiceName(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getId(Data data) {
return data.getDataString(0);
}
public String getServiceName() {
return getDataString(1);
}
public String getApplicationCode(Data data) {
return data.getDataString(1);
}
public int getApplicationId() {
return getDataInteger(0);
}
public int getApplicationId(Data data) {
return data.getDataInteger(1);
}
public int getServiceId() {
return getDataInteger(1);
}
}
......@@ -18,28 +18,36 @@
package org.skywalking.apm.collector.storage.table.segment;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class SegmentDataDefine extends DataDefine {
public class Segment extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.Segment.ordinal();
}
private static final Column[] STRING_COLUMNS = {
new Column(SegmentTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {
new Column(SegmentTable.COLUMN_DATA_BINARY, new CoverOperation()),
};
@Override protected int initialCapacity() {
return 2;
public Segment(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(SegmentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(SegmentTable.COLUMN_DATA_BINARY, AttributeType.BYTE, new CoverOperation()));
public byte[] getDataBinary() {
return getDataBytes(0);
}
}
......@@ -18,35 +18,71 @@
package org.skywalking.apm.collector.storage.table.segment;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class SegmentCostDataDefine extends DataDefine {
public class SegmentCost extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.SegmentCost.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(SegmentCostTable.COLUMN_ID, new NonOperation()),
new Column(SegmentCostTable.COLUMN_SEGMENT_ID, new CoverOperation()),
new Column(SegmentCostTable.COLUMN_SERVICE_NAME, new CoverOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(SegmentCostTable.COLUMN_COST, new CoverOperation()),
new Column(SegmentCostTable.COLUMN_START_TIME, new CoverOperation()),
new Column(SegmentCostTable.COLUMN_END_TIME, new CoverOperation()),
new Column(SegmentCostTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(SegmentCostTable.COLUMN_APPLICATION_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {
new Column(SegmentCostTable.COLUMN_IS_ERROR, new CoverOperation()),
};
private static final Column[] BYTE_COLUMNS = {};
public SegmentCost(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getSegmentId() {
return getDataString(1);
}
public String getServiceName() {
return getDataString(2);
}
public Long getCost() {
return getDataLong(0);
}
public Long getStartTime() {
return getDataLong(1);
}
public Long getEndTime() {
return getDataLong(2);
}
public Long getTimeBucket() {
return getDataLong(3);
}
@Override protected int initialCapacity() {
return 9;
public Integer getApplicationId() {
return getDataInteger(0);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(SegmentCostTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(SegmentCostTable.COLUMN_SEGMENT_ID, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(SegmentCostTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(SegmentCostTable.COLUMN_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(4, new Attribute(SegmentCostTable.COLUMN_COST, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(SegmentCostTable.COLUMN_START_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(SegmentCostTable.COLUMN_END_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(SegmentCostTable.COLUMN_IS_ERROR, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(8, new Attribute(SegmentCostTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new NonOperation()));
public Boolean getIsError() {
return getDataBoolean(0);
}
}
......@@ -18,32 +18,75 @@
package org.skywalking.apm.collector.storage.table.service;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ServiceEntryDataDefine extends DataDefine {
public class ServiceEntry extends Data {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.ServiceEntry.ordinal();
private static final Column[] STRING_COLUMNS = {
new Column(ServiceEntryTable.COLUMN_ID, new NonOperation()),
new Column(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, new CoverOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(ServiceEntryTable.COLUMN_REGISTER_TIME, new NonOperation()),
new Column(ServiceEntryTable.COLUMN_NEWEST_TIME, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(ServiceEntryTable.COLUMN_APPLICATION_ID, new CoverOperation()),
new Column(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public ServiceEntry(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getEntryServiceName() {
return getDataString(1);
}
public void setEntryServiceName(String entryServiceName) {
setDataString(1, entryServiceName);
}
public Long getRegisterTime() {
return getDataLong(0);
}
public void setRegisterTime(Long registerTime) {
setDataLong(0, registerTime);
}
public Long getNewestTime() {
return getDataLong(1);
}
public void setNewestTime(Long newestTime) {
setDataLong(1, newestTime);
}
public Integer getApplicationId() {
return getDataInteger(0);
}
public void setApplicationId(Integer applicationId) {
setDataInteger(0, applicationId);
}
@Override protected int initialCapacity() {
return 6;
public Integer getEntryServiceId() {
return getDataInteger(1);
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceEntryTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceEntryTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(4, new Attribute(ServiceEntryTable.COLUMN_REGISTER_TIME, AttributeType.LONG, new NonOperation()));
addAttribute(5, new Attribute(ServiceEntryTable.COLUMN_NEWEST_TIME, AttributeType.LONG, new CoverOperation()));
public void setEntryServiceId(Integer entryServiceId) {
setDataInteger(1, entryServiceId);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.serviceref;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class ServiceReference extends Data {
private static final Column[] STRING_COLUMNS = {
new Column(ServiceReferenceTable.COLUMN_ID, new NonOperation()),
new Column(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, new NonOperation()),
new Column(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, new NonOperation()),
new Column(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(ServiceReferenceTable.COLUMN_S1_LTE, new AddOperation()),
new Column(ServiceReferenceTable.COLUMN_S3_LTE, new AddOperation()),
new Column(ServiceReferenceTable.COLUMN_S5_LTE, new AddOperation()),
new Column(ServiceReferenceTable.COLUMN_S5_GT, new AddOperation()),
new Column(ServiceReferenceTable.COLUMN_SUMMARY, new AddOperation()),
new Column(ServiceReferenceTable.COLUMN_ERROR, new AddOperation()),
new Column(ServiceReferenceTable.COLUMN_COST_SUMMARY, new AddOperation()),
new Column(ServiceReferenceTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, new NonOperation()),
new Column(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, new NonOperation()),
new Column(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, new NonOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public ServiceReference(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getEntryServiceName() {
return getDataString(1);
}
public void setEntryServiceName(String entryServiceName) {
setDataString(1, entryServiceName);
}
public String getFrontServiceName() {
return getDataString(2);
}
public void setFrontServiceName(String frontServiceName) {
setDataString(2, frontServiceName);
}
public String getBehindServiceName() {
return getDataString(3);
}
public void setBehindServiceName(String behindServiceName) {
setDataString(3, behindServiceName);
}
public Integer getEntryServiceId() {
return getDataInteger(0);
}
public void setEntryServiceId(Integer entryServiceId) {
setDataInteger(0, entryServiceId);
}
public Integer getFrontServiceId() {
return getDataInteger(1);
}
public void setFrontServiceId(Integer frontServiceId) {
setDataInteger(1, frontServiceId);
}
public Integer getBehindServiceId() {
return getDataInteger(2);
}
public void setBehindServiceId(Integer behindServiceId) {
setDataInteger(2, behindServiceId);
}
public Long getS1Lte() {
return getDataLong(0);
}
public void setS1Lte(Long s1Lte) {
setDataLong(0, s1Lte);
}
public Long getS3Lte() {
return getDataLong(1);
}
public void setS3Lte(Long s3Lte) {
setDataLong(1, s3Lte);
}
public Long getS5Lte() {
return getDataLong(2);
}
public void setS5Lte(Long s5Lte) {
setDataLong(2, s5Lte);
}
public Long getS5Gt() {
return getDataLong(3);
}
public void setS5Gt(Long s5Gt) {
setDataLong(3, s5Gt);
}
public Long getSummary() {
return getDataLong(4);
}
public void setSummary(Long summary) {
setDataLong(4, summary);
}
public Long getError() {
return getDataLong(5);
}
public void setError(Long error) {
setDataLong(5, error);
}
public Long getCostSummary() {
return getDataLong(6);
}
public void setCostSummary(Long costSummary) {
setDataLong(6, costSummary);
}
public Long getTimeBucket() {
return getDataLong(7);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(7, timeBucket);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.serviceref;
import org.skywalking.apm.collector.core.data.Attribute;
import org.skywalking.apm.collector.core.data.AttributeType;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
import org.skywalking.apm.collector.remote.RemoteDataMapping;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceDataDefine extends DataDefine {
@Override public int remoteDataMappingId() {
return RemoteDataMapping.ServiceReference.ordinal();
}
@Override protected int initialCapacity() {
return 15;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(ServiceReferenceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(2, new Attribute(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, AttributeType.STRING, new NonOperation()));
addAttribute(3, new Attribute(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(4, new Attribute(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, AttributeType.STRING, new NonOperation()));
addAttribute(5, new Attribute(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(6, new Attribute(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, AttributeType.STRING, new NonOperation()));
addAttribute(7, new Attribute(ServiceReferenceTable.COLUMN_S1_LTE, AttributeType.LONG, new AddOperation()));
addAttribute(8, new Attribute(ServiceReferenceTable.COLUMN_S3_LTE, AttributeType.LONG, new AddOperation()));
addAttribute(9, new Attribute(ServiceReferenceTable.COLUMN_S5_LTE, AttributeType.LONG, new AddOperation()));
addAttribute(10, new Attribute(ServiceReferenceTable.COLUMN_S5_GT, AttributeType.LONG, new AddOperation()));
addAttribute(11, new Attribute(ServiceReferenceTable.COLUMN_SUMMARY, AttributeType.LONG, new AddOperation()));
addAttribute(12, new Attribute(ServiceReferenceTable.COLUMN_ERROR, AttributeType.LONG, new AddOperation()));
addAttribute(13, new Attribute(ServiceReferenceTable.COLUMN_COST_SUMMARY, AttributeType.LONG, new AddOperation()));
addAttribute(14, new Attribute(ServiceReferenceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new NonOperation()));
}
}
......@@ -23,10 +23,9 @@ import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,17 +45,14 @@ public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
return getMinId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
}
@Override public void save(Data data) {
String id = ApplicationDataDefine.Application.INSTANCE.getId(data);
int applicationId = ApplicationDataDefine.Application.INSTANCE.getApplicationId(data);
String applicationCode = ApplicationDataDefine.Application.INSTANCE.getApplicationCode(data);
logger.debug("save application register info, application id: {}, application code: {}", applicationId, applicationCode);
@Override public void save(Application application) {
logger.debug("save application register info, application getId: {}, application code: {}", application.getId(), application.getApplicationCode());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
source.put(ApplicationTable.COLUMN_APPLICATION_CODE, applicationCode);
source.put(ApplicationTable.COLUMN_APPLICATION_ID, applicationId);
source.put(ApplicationTable.COLUMN_APPLICATION_CODE, application.getApplicationCode());
source.put(ApplicationTable.COLUMN_APPLICATION_ID, application.getApplicationId());
IndexResponse response = client.prepareIndex(ApplicationTable.TABLE, id).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save application register info, application id: {}, application code: {}, status: {}", applicationId, applicationCode, response.status().name());
IndexResponse response = client.prepareIndex(ApplicationTable.TABLE, application.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save application register info, application getId: {}, application code: {}, status: {}", application.getApplicationId(), application.getApplicationCode(), response.status().name());
}
}
......@@ -22,11 +22,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.ICpuMetricDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,25 +33,25 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, CpuMetric> {
private final Logger logger = LoggerFactory.getLogger(CpuMetricEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
@Override public CpuMetric get(String id) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(CpuMetric cpuMetric) {
Map<String, Object> source = new HashMap<>();
source.put(CpuMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0));
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0));
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(CpuMetricTable.COLUMN_INSTANCE_ID, cpuMetric.getInstanceId());
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, cpuMetric.getUsagePercent());
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, cpuMetric.getTimeBucket());
logger.debug("prepare cpu metric batch insert, id: {}", data.getDataString(0));
return getClient().prepareIndex(CpuMetricTable.TABLE, data.getDataString(0)).setSource(source);
logger.debug("prepare cpu metric batch insert, getId: {}", cpuMetric.getId());
return getClient().prepareIndex(CpuMetricTable.TABLE, cpuMetric.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(CpuMetric cpuMetric) {
return null;
}
}
......@@ -22,34 +22,33 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IGCMetricDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetricTable;
/**
* @author peng-yongsheng
*/
public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GCMetric> {
@Override public Data get(String id, DataDefine dataDefine) {
@Override public GCMetric get(String id) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(GCMetric gcMetric) {
Map<String, Object> source = new HashMap<>();
source.put(GCMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(1));
source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0));
source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1));
source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2));
source.put(GCMetricTable.COLUMN_INSTANCE_ID, gcMetric.getInstanceId());
source.put(GCMetricTable.COLUMN_PHRASE, gcMetric.getPhrase());
source.put(GCMetricTable.COLUMN_COUNT, gcMetric.getCount());
source.put(GCMetricTable.COLUMN_TIME, gcMetric.getTime());
source.put(GCMetricTable.COLUMN_TIME_BUCKET, gcMetric.getTimeBucket());
return getClient().prepareIndex(GCMetricTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(GCMetricTable.TABLE, gcMetric.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(GCMetric gcMetric) {
return null;
}
}
......@@ -23,11 +23,10 @@ import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.skywalking.apm.collector.storage.table.global.GlobalTraceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,24 +34,24 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GlobalTrace> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
@Override public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
Map<String, Object> source = new HashMap<>();
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getDataString(2));
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getSegmentId());
source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getGlobalTraceId());
source.put(GlobalTraceTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
logger.debug("global trace source: {}", source.toString());
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getId()).setSource(source);
}
}
......@@ -23,11 +23,10 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.storage.table.instance.InstPerformanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,46 +34,46 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstPerformance> {
private final Logger logger = LoggerFactory.getLogger(InstPerformanceEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
@Override public InstPerformance get(String id) {
GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get();
if (getResponse.isExists()) {
logger.debug("id: {} is exist", id);
Data data = dataDefine.build(id);
logger.debug("getId: {} is exist", id);
InstPerformance instPerformance = new InstPerformance(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(InstPerformanceTable.COLUMN_APPLICATION_ID));
data.setDataInteger(1, (Integer)source.get(InstPerformanceTable.COLUMN_INSTANCE_ID));
data.setDataInteger(2, (Integer)source.get(InstPerformanceTable.COLUMN_CALLS));
data.setDataLong(0, ((Number)source.get(InstPerformanceTable.COLUMN_COST_TOTAL)).longValue());
data.setDataLong(1, ((Number)source.get(InstPerformanceTable.COLUMN_TIME_BUCKET)).longValue());
return data;
instPerformance.setApplicationId((Integer)source.get(InstPerformanceTable.COLUMN_APPLICATION_ID));
instPerformance.setInstanceId((Integer)source.get(InstPerformanceTable.COLUMN_INSTANCE_ID));
instPerformance.setCalls((Integer)source.get(InstPerformanceTable.COLUMN_CALLS));
instPerformance.setCostTotal(((Number)source.get(InstPerformanceTable.COLUMN_COST_TOTAL)).longValue());
instPerformance.setTimeBucket(((Number)source.get(InstPerformanceTable.COLUMN_TIME_BUCKET)).longValue());
return instPerformance;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(InstPerformance data) {
Map<String, Object> source = new HashMap<>();
source.put(InstPerformanceTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(InstPerformanceTable.COLUMN_INSTANCE_ID, data.getDataInteger(1));
source.put(InstPerformanceTable.COLUMN_CALLS, data.getDataInteger(2));
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0));
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1));
source.put(InstPerformanceTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstPerformanceTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(InstPerformanceTable.COLUMN_CALLS, data.getCalls());
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getCostTotal());
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(InstPerformanceTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(InstPerformanceTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(InstPerformance data) {
Map<String, Object> source = new HashMap<>();
source.put(InstPerformanceTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(InstPerformanceTable.COLUMN_INSTANCE_ID, data.getDataInteger(1));
source.put(InstPerformanceTable.COLUMN_CALLS, data.getDataInteger(2));
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0));
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1));
source.put(InstPerformanceTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstPerformanceTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(InstPerformanceTable.COLUMN_CALLS, data.getCalls());
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getCostTotal());
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(InstPerformanceTable.TABLE, data.getDataString(0)).setDoc(source);
return getClient().prepareUpdate(InstPerformanceTable.TABLE, data.getId()).setDoc(source);
}
}
......@@ -30,10 +30,9 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -73,27 +72,19 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
@Override public void save(Data data) {
String id = InstanceDataDefine.Instance.INSTANCE.getId(data);
int instanceId = InstanceDataDefine.Instance.INSTANCE.getInstanceId(data);
int applicationId = InstanceDataDefine.Instance.INSTANCE.getApplicationId(data);
String agentUUID = InstanceDataDefine.Instance.INSTANCE.getAgentUUID(data);
long registerTime = InstanceDataDefine.Instance.INSTANCE.getRegisterTime(data);
long heartBeatTime = InstanceDataDefine.Instance.INSTANCE.getHeartBeatTime(data);
String osInfo = InstanceDataDefine.Instance.INSTANCE.getOsInfo(data);
logger.debug("save instance register info, application id: {}, agentUUID: {}", applicationId, agentUUID);
@Override public void save(Instance instance) {
logger.debug("save instance register info, application getId: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_INSTANCE_ID, instanceId);
source.put(InstanceTable.COLUMN_APPLICATION_ID, applicationId);
source.put(InstanceTable.COLUMN_AGENT_UUID, agentUUID);
source.put(InstanceTable.COLUMN_REGISTER_TIME, registerTime);
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, heartBeatTime);
source.put(InstanceTable.COLUMN_OS_INFO, osInfo);
source.put(InstanceTable.COLUMN_INSTANCE_ID, instance.getInstanceId());
source.put(InstanceTable.COLUMN_APPLICATION_ID, instance.getApplicationId());
source.put(InstanceTable.COLUMN_AGENT_UUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_REGISTER_TIME, instance.getRegisterTime());
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, instance.getHeartBeatTime());
source.put(InstanceTable.COLUMN_OS_INFO, instance.getOsInfo());
IndexResponse response = client.prepareIndex(InstanceTable.TABLE, id).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save instance register info, application id: {}, agentUUID: {}, status: {}", applicationId, agentUUID, response.status().name());
IndexResponse response = client.prepareIndex(InstanceTable.TABLE, instance.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save instance register info, application getId: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name());
}
@Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
......
......@@ -24,11 +24,10 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -36,32 +35,32 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, Instance> {
private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
@Override public Instance get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Instance instance = new Instance(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(InstanceTable.COLUMN_INSTANCE_ID));
data.setDataLong(0, (Long)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME));
logger.debug("id: {} is exists", id);
return data;
instance.setInstanceId((Integer)source.get(InstanceTable.COLUMN_INSTANCE_ID));
instance.setHeartBeatTime((Long)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME));
logger.debug("getId: {} is exists", id);
return instance;
} else {
logger.debug("id: {} is not exists", id);
logger.debug("getId: {} is not exists", id);
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(Instance data) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(Instance data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getDataLong(0));
return getClient().prepareUpdate(InstanceTable.TABLE, data.getDataString(0)).setDoc(source);
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getHeartBeatTime());
return getClient().prepareUpdate(InstanceTable.TABLE, data.getId()).setDoc(source);
}
}
......@@ -22,36 +22,35 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
/**
* @author peng-yongsheng
*/
public class MemoryMetricEsDAO extends EsDAO implements IMemoryMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class MemoryMetricEsDAO extends EsDAO implements IMemoryMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryMetric> {
@Override public Data get(String id, DataDefine dataDefine) {
@Override public MemoryMetric get(String id) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(MemoryMetric data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0));
source.put(MemoryMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
source.put(MemoryMetricTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(MemoryMetricTable.COLUMN_IS_HEAP, data.getIsHeap());
source.put(MemoryMetricTable.COLUMN_INIT, data.getInit());
source.put(MemoryMetricTable.COLUMN_MAX, data.getMax());
source.put(MemoryMetricTable.COLUMN_USED, data.getUsed());
source.put(MemoryMetricTable.COLUMN_COMMITTED, data.getCommitted());
source.put(MemoryMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(MemoryMetricTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(MemoryMetricTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(MemoryMetric data) {
return null;
}
}
......@@ -22,36 +22,35 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
/**
* @author peng-yongsheng
*/
public class MemoryPoolMetricEsDAO extends EsDAO implements IMemoryPoolMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class MemoryPoolMetricEsDAO extends EsDAO implements IMemoryPoolMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, MemoryPoolMetric> {
@Override public Data get(String id, DataDefine dataDefine) {
@Override public MemoryPoolMetric get(String id) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(MemoryPoolMetric data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryPoolMetricTable.COLUMN_POOL_TYPE, data.getDataInteger(1));
source.put(MemoryPoolMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryPoolMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryPoolMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryPoolMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
source.put(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(MemoryPoolMetricTable.COLUMN_POOL_TYPE, data.getPoolType());
source.put(MemoryPoolMetricTable.COLUMN_INIT, data.getInit());
source.put(MemoryPoolMetricTable.COLUMN_MAX, data.getMax());
source.put(MemoryPoolMetricTable.COLUMN_USED, data.getUsed());
source.put(MemoryPoolMetricTable.COLUMN_COMMITTED, data.getCommitted());
source.put(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(MemoryPoolMetricTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(MemoryPoolMetricTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(MemoryPoolMetric data) {
return null;
}
}
......@@ -23,53 +23,52 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.INodeComponentDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
/**
* @author peng-yongsheng
*/
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeComponent> {
@Override public Data get(String id, DataDefine dataDefine) {
@Override public NodeComponent get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeComponentTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
NodeComponent nodeComponent = new NodeComponent(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, ((Number)source.get(NodeComponentTable.COLUMN_COMPONENT_ID)).intValue());
data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_COMPONENT_NAME));
data.setDataInteger(1, ((Number)source.get(NodeComponentTable.COLUMN_PEER_ID)).intValue());
data.setDataString(2, (String)source.get(NodeComponentTable.COLUMN_PEER));
data.setDataLong(0, (Long)source.get(NodeComponentTable.COLUMN_TIME_BUCKET));
return data;
nodeComponent.setComponentId(((Number)source.get(NodeComponentTable.COLUMN_COMPONENT_ID)).intValue());
nodeComponent.setComponentName((String)source.get(NodeComponentTable.COLUMN_COMPONENT_NAME));
nodeComponent.setPeerId(((Number)source.get(NodeComponentTable.COLUMN_PEER_ID)).intValue());
nodeComponent.setPeer((String)source.get(NodeComponentTable.COLUMN_PEER));
nodeComponent.setTimeBucket((Long)source.get(NodeComponentTable.COLUMN_TIME_BUCKET));
return nodeComponent;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(NodeComponent data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getComponentId());
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getComponentName());
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getPeerId());
source.put(NodeComponentTable.COLUMN_PEER, data.getPeer());
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(NodeComponentTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(NodeComponentTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(NodeComponent data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getComponentId());
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getComponentName());
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getPeerId());
source.put(NodeComponentTable.COLUMN_PEER, data.getPeer());
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getDataString(0)).setDoc(source);
return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getId()).setDoc(source);
}
}
......@@ -23,68 +23,67 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.noderef.NodeReferenceTable;
/**
* @author peng-yongsheng
*/
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeReference> {
@Override public Data get(String id, DataDefine dataDefine) {
@Override public NodeReference get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeReferenceTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
NodeReference nodeReference = new NodeReference(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, ((Number)source.get(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
data.setDataInteger(1, ((Number)source.get(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
data.setDataString(1, (String)source.get(NodeReferenceTable.COLUMN_BEHIND_PEER));
data.setDataInteger(2, ((Number)source.get(NodeReferenceTable.COLUMN_S1_LTE)).intValue());
data.setDataInteger(3, ((Number)source.get(NodeReferenceTable.COLUMN_S3_LTE)).intValue());
data.setDataInteger(4, ((Number)source.get(NodeReferenceTable.COLUMN_S5_LTE)).intValue());
data.setDataInteger(5, ((Number)source.get(NodeReferenceTable.COLUMN_S5_GT)).intValue());
data.setDataInteger(6, ((Number)source.get(NodeReferenceTable.COLUMN_SUMMARY)).intValue());
data.setDataInteger(7, ((Number)source.get(NodeReferenceTable.COLUMN_ERROR)).intValue());
data.setDataLong(0, ((Number)source.get(NodeReferenceTable.COLUMN_TIME_BUCKET)).longValue());
return data;
nodeReference.setFrontApplicationId(((Number)source.get(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
nodeReference.setBehindApplicationId(((Number)source.get(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
nodeReference.setBehindPeer((String)source.get(NodeReferenceTable.COLUMN_BEHIND_PEER));
nodeReference.setS1Lte(((Number)source.get(NodeReferenceTable.COLUMN_S1_LTE)).intValue());
nodeReference.setS3Lte(((Number)source.get(NodeReferenceTable.COLUMN_S3_LTE)).intValue());
nodeReference.setS5Lte(((Number)source.get(NodeReferenceTable.COLUMN_S5_LTE)).intValue());
nodeReference.setS5Gt(((Number)source.get(NodeReferenceTable.COLUMN_S5_GT)).intValue());
nodeReference.setSummary(((Number)source.get(NodeReferenceTable.COLUMN_SUMMARY)).intValue());
nodeReference.setError(((Number)source.get(NodeReferenceTable.COLUMN_ERROR)).intValue());
nodeReference.setTimeBucket(((Number)source.get(NodeReferenceTable.COLUMN_TIME_BUCKET)).longValue());
return nodeReference;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(NodeReference data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeReferenceTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getBehindPeer());
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getS1Lte());
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getS3Lte());
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getS5Lte());
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getS5Gt());
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getSummary());
source.put(NodeReferenceTable.COLUMN_ERROR, data.getError());
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(NodeReferenceTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(NodeReferenceTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(NodeReference data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeReferenceTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getBehindPeer());
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getS1Lte());
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getS3Lte());
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getS5Lte());
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getS5Gt());
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getSummary());
source.put(NodeReferenceTable.COLUMN_ERROR, data.getError());
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(NodeReferenceTable.TABLE, data.getDataString(0)).setDoc(source);
return getClient().prepareUpdate(NodeReferenceTable.TABLE, data.getId()).setDoc(source);
}
}
......@@ -22,11 +22,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
import org.skywalking.apm.collector.storage.table.segment.SegmentCostTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -34,30 +33,30 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, SegmentCost> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
@Override public SegmentCost get(String id) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(SegmentCost data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
logger.debug("segment cost prepareBatchInsert, id: {}", data.getDataString(0));
@Override public IndexRequestBuilder prepareBatchInsert(SegmentCost data) {
logger.debug("segment cost prepareBatchInsert, getId: {}", data.getId());
Map<String, Object> source = new HashMap<>();
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getDataString(1));
source.put(SegmentCostTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(SegmentCostTable.COLUMN_SERVICE_NAME, data.getDataString(2));
source.put(SegmentCostTable.COLUMN_COST, data.getDataLong(0));
source.put(SegmentCostTable.COLUMN_START_TIME, data.getDataLong(1));
source.put(SegmentCostTable.COLUMN_END_TIME, data.getDataLong(2));
source.put(SegmentCostTable.COLUMN_IS_ERROR, data.getDataBoolean(0));
source.put(SegmentCostTable.COLUMN_TIME_BUCKET, data.getDataLong(3));
source.put(SegmentCostTable.COLUMN_SEGMENT_ID, data.getSegmentId());
source.put(SegmentCostTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(SegmentCostTable.COLUMN_SERVICE_NAME, data.getServiceName());
source.put(SegmentCostTable.COLUMN_COST, data.getCost());
source.put(SegmentCostTable.COLUMN_START_TIME, data.getStartTime());
source.put(SegmentCostTable.COLUMN_END_TIME, data.getEndTime());
source.put(SegmentCostTable.COLUMN_IS_ERROR, data.getIsError());
source.put(SegmentCostTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
logger.debug("segment cost source: {}", source.toString());
return getClient().prepareIndex(SegmentCostTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(SegmentCostTable.TABLE, data.getId()).setSource(source);
}
}
......@@ -23,11 +23,10 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.ISegmentDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.collector.storage.table.segment.SegmentTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,22 +34,22 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class SegmentEsDAO extends EsDAO implements ISegmentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class SegmentEsDAO extends EsDAO implements ISegmentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, Segment> {
private final Logger logger = LoggerFactory.getLogger(SegmentEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
@Override public Segment get(String id) {
return null;
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(Segment data) {
Map<String, Object> source = new HashMap<>();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBytes(0))));
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBinary())));
logger.debug("segment source: {}", source.toString());
return getClient().prepareIndex(SegmentTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(SegmentTable.TABLE, data.getId()).setSource(source);
}
}
......@@ -23,52 +23,51 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IServiceEntryDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.storage.table.service.ServiceEntryTable;
/**
* @author peng-yongsheng
*/
public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceEntry> {
@Override public Data get(String id, DataDefine dataDefine) {
@Override public ServiceEntry get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceEntryTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
ServiceEntry serviceEntry = new ServiceEntry(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, ((Number)source.get(ServiceEntryTable.COLUMN_APPLICATION_ID)).intValue());
data.setDataInteger(1, ((Number)source.get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID)).intValue());
data.setDataString(1, (String)source.get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME));
data.setDataLong(0, ((Number)source.get(ServiceEntryTable.COLUMN_REGISTER_TIME)).longValue());
data.setDataLong(1, ((Number)source.get(ServiceEntryTable.COLUMN_NEWEST_TIME)).longValue());
return data;
serviceEntry.setApplicationId(((Number)source.get(ServiceEntryTable.COLUMN_APPLICATION_ID)).intValue());
serviceEntry.setEntryServiceId(((Number)source.get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID)).intValue());
serviceEntry.setEntryServiceName((String)source.get(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME));
serviceEntry.setRegisterTime(((Number)source.get(ServiceEntryTable.COLUMN_REGISTER_TIME)).longValue());
serviceEntry.setNewestTime(((Number)source.get(ServiceEntryTable.COLUMN_NEWEST_TIME)).longValue());
return serviceEntry;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(ServiceEntry data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getDataLong(0));
source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getDataLong(1));
return getClient().prepareIndex(ServiceEntryTable.TABLE, data.getDataString(0)).setSource(source);
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getEntryServiceName());
source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getRegisterTime());
source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getNewestTime());
return getClient().prepareIndex(ServiceEntryTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceEntry data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getDataLong(0));
source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getDataLong(1));
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getEntryServiceName());
source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getRegisterTime());
source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getNewestTime());
return getClient().prepareUpdate(ServiceEntryTable.TABLE, data.getDataString(0)).setDoc(source);
return getClient().prepareUpdate(ServiceEntryTable.TABLE, data.getId()).setDoc(source);
}
}
......@@ -23,10 +23,9 @@ import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,20 +45,15 @@ public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
return getMinId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
}
@Override public void save(Data data) {
String id = ServiceNameDataDefine.ServiceName.INSTANCE.getId(data);
int applicationId = ServiceNameDataDefine.ServiceName.INSTANCE.getApplicationId(data);
int serviceId = ServiceNameDataDefine.ServiceName.INSTANCE.getServiceId(data);
String serviceName = ServiceNameDataDefine.ServiceName.INSTANCE.getServiceName(data);
logger.debug("save service name register info, application id: {}, service name: {}", applicationId, serviceName);
@Override public void save(ServiceName serviceName) {
logger.debug("save service name register info, application getId: {}, service name: {}", serviceName.getId(), serviceName.getServiceName());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
source.put(ServiceNameTable.COLUMN_SERVICE_ID, serviceId);
source.put(ServiceNameTable.COLUMN_APPLICATION_ID, applicationId);
source.put(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName);
source.put(ServiceNameTable.COLUMN_SERVICE_ID, serviceName.getServiceId());
source.put(ServiceNameTable.COLUMN_APPLICATION_ID, serviceName.getApplicationId());
source.put(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName.getServiceName());
IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, id).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save service name register info, application id: {}, service name: {}, status: {}", applicationId, serviceName, response.status().name());
IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, serviceName.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save service name register info, application getId: {}, service name: {}, status: {}", serviceName.getId(), serviceName.getServiceName(), response.status().name());
}
}
......@@ -23,11 +23,10 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.storage.dao.IServiceReferenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReferenceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,72 +34,72 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReference> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
@Override public ServiceReference get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
ServiceReference serviceReference = new ServiceReference(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, ((Number)source.get(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID)).intValue());
data.setDataString(1, (String)source.get(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME));
data.setDataInteger(1, ((Number)source.get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).intValue());
data.setDataString(2, (String)source.get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME));
data.setDataInteger(2, ((Number)source.get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
data.setDataString(3, (String)source.get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME));
data.setDataLong(0, ((Number)source.get(ServiceReferenceTable.COLUMN_S1_LTE)).longValue());
data.setDataLong(1, ((Number)source.get(ServiceReferenceTable.COLUMN_S3_LTE)).longValue());
data.setDataLong(2, ((Number)source.get(ServiceReferenceTable.COLUMN_S5_LTE)).longValue());
data.setDataLong(3, ((Number)source.get(ServiceReferenceTable.COLUMN_S5_GT)).longValue());
data.setDataLong(4, ((Number)source.get(ServiceReferenceTable.COLUMN_SUMMARY)).longValue());
data.setDataLong(5, ((Number)source.get(ServiceReferenceTable.COLUMN_ERROR)).longValue());
data.setDataLong(6, ((Number)source.get(ServiceReferenceTable.COLUMN_COST_SUMMARY)).longValue());
data.setDataLong(7, ((Number)source.get(ServiceReferenceTable.COLUMN_TIME_BUCKET)).longValue());
return data;
serviceReference.setEntryServiceId(((Number)source.get(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID)).intValue());
serviceReference.setEntryServiceName((String)source.get(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME));
serviceReference.setFrontServiceId(((Number)source.get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReference.setFrontServiceName((String)source.get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME));
serviceReference.setBehindServiceId(((Number)source.get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReference.setBehindServiceName((String)source.get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME));
serviceReference.setS1Lte(((Number)source.get(ServiceReferenceTable.COLUMN_S1_LTE)).longValue());
serviceReference.setS3Lte(((Number)source.get(ServiceReferenceTable.COLUMN_S3_LTE)).longValue());
serviceReference.setS5Lte(((Number)source.get(ServiceReferenceTable.COLUMN_S5_LTE)).longValue());
serviceReference.setS5Gt(((Number)source.get(ServiceReferenceTable.COLUMN_S5_GT)).longValue());
serviceReference.setSummary(((Number)source.get(ServiceReferenceTable.COLUMN_SUMMARY)).longValue());
serviceReference.setError(((Number)source.get(ServiceReferenceTable.COLUMN_ERROR)).longValue());
serviceReference.setCostSummary(((Number)source.get(ServiceReferenceTable.COLUMN_COST_SUMMARY)).longValue());
serviceReference.setTimeBucket(((Number)source.get(ServiceReferenceTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReference;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
@Override public IndexRequestBuilder prepareBatchInsert(ServiceReference data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3));
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0));
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1));
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2));
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3));
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4));
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5));
source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getDataLong(6));
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(7));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getEntryServiceName());
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getFrontServiceName());
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getBehindServiceName());
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getS1Lte());
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getS3Lte());
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getS5Lte());
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getS5Gt());
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getSummary());
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getError());
source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getCostSummary());
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(ServiceReferenceTable.TABLE, data.getDataString(0)).setSource(source);
return getClient().prepareIndex(ServiceReferenceTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReference data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3));
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0));
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1));
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2));
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3));
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4));
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5));
source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getDataLong(6));
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(7));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getEntryServiceName());
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getFrontServiceName());
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getBehindServiceName());
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getS1Lte());
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getS3Lte());
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getS5Lte());
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getS5Gt());
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getSummary());
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getError());
source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getCostSummary());
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(ServiceReferenceTable.TABLE, data.getDataString(0)).setDoc(source);
return getClient().prepareUpdate(ServiceReferenceTable.TABLE, data.getId()).setDoc(source);
}
}
......@@ -36,7 +36,7 @@ public class MemoryMetricEsTableDefine extends ElasticSearchTableDefine {
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_INIT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_MAX, ElasticSearchColumnDefine.Type.Long.name()));
......
......@@ -40,7 +40,7 @@ public class ApplicationH2CacheDAO extends H2DAO implements IApplicationCacheDAO
@Override
public int getApplicationId(String applicationCode) {
logger.info("get the application id with application code = {}", applicationCode);
logger.info("get the application getId with application code = {}", applicationCode);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_APPLICATION_ID_OR_CODE_SQL, ApplicationTable.COLUMN_APPLICATION_ID, ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_CODE);
......
......@@ -26,7 +26,7 @@ import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.table.register.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -49,9 +49,9 @@ public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
@Override
public void save(Data data) {
String id = ApplicationDataDefine.Application.INSTANCE.getId(data);
int applicationId = ApplicationDataDefine.Application.INSTANCE.getApplicationId(data);
String applicationCode = ApplicationDataDefine.Application.INSTANCE.getApplicationCode(data);
String id = Application.Application.INSTANCE.getId(data);
int applicationId = Application.Application.INSTANCE.getApplicationId(data);
String applicationCode = Application.Application.INSTANCE.getApplicationCode(data);
H2Client client = getClient();
Map<String, Object> source = new HashMap<>();
......
......@@ -49,7 +49,7 @@ public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO, IPersistence
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0));
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("prepare cpu metric batch insert, id: {}", data.getDataString(0));
logger.debug("prepare cpu metric batch insert, getId: {}", data.getDataString(0));
String sql = SqlBuilder.buildBatchInsertSql(CpuMetricTable.TABLE, source.keySet());
entity.setSql(sql);
entity.setParams(source.values().toArray(new Object[0]));
......
......@@ -39,7 +39,7 @@ public class InstanceH2CacheDAO extends H2DAO implements IInstanceCacheDAO {
private static final String GET_APPLICATION_ID_SQL = "select {0} from {1} where {2} = ?";
@Override public int getApplicationId(int applicationInstanceId) {
logger.info("get the application id with application id = {}", applicationInstanceId);
logger.info("get the application getId with application getId = {}", applicationInstanceId);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_APPLICATION_ID_SQL, InstanceTable.COLUMN_APPLICATION_ID, InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
Object[] params = new Object[] {applicationInstanceId};
......
......@@ -28,7 +28,7 @@ import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.table.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,7 +43,7 @@ public class InstanceH2DAO extends H2DAO implements IInstanceDAO {
private static final String UPDATE_HEARTBEAT_TIME_SQL = "update {0} set {1} = ? where {2} = ?";
@Override public int getInstanceId(int applicationId, String agentUUID) {
logger.info("get the application id with application id = {}, agentUUID = {}", applicationId, agentUUID);
logger.info("get the application getId with application getId = {}, agentUUID = {}", applicationId, agentUUID);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_INSTANCE_ID_SQL, InstanceTable.COLUMN_INSTANCE_ID, InstanceTable.TABLE, InstanceTable.COLUMN_APPLICATION_ID,
InstanceTable.COLUMN_AGENT_UUID);
......@@ -67,13 +67,13 @@ public class InstanceH2DAO extends H2DAO implements IInstanceDAO {
}
@Override public void save(Data data) {
String id = InstanceDataDefine.Instance.INSTANCE.getId(data);
int instanceId = InstanceDataDefine.Instance.INSTANCE.getInstanceId(data);
int applicationId = InstanceDataDefine.Instance.INSTANCE.getApplicationId(data);
String agentUUID = InstanceDataDefine.Instance.INSTANCE.getAgentUUID(data);
long registerTime = InstanceDataDefine.Instance.INSTANCE.getRegisterTime(data);
long heartBeatTime = InstanceDataDefine.Instance.INSTANCE.getHeartBeatTime(data);
String osInfo = InstanceDataDefine.Instance.INSTANCE.getOsInfo(data);
String id = Instance.Instance.INSTANCE.getId(data);
int instanceId = Instance.Instance.INSTANCE.getInstanceId(data);
int applicationId = Instance.Instance.INSTANCE.getApplicationId(data);
String agentUUID = Instance.Instance.INSTANCE.getAgentUUID(data);
long registerTime = Instance.Instance.INSTANCE.getRegisterTime(data);
long heartBeatTime = Instance.Instance.INSTANCE.getHeartBeatTime(data);
String osInfo = Instance.Instance.INSTANCE.getOsInfo(data);
H2Client client = getClient();
Map<String, Object> source = new HashMap<>();
......
......@@ -41,7 +41,7 @@ public class MemoryMetricH2DAO extends H2DAO implements IMemoryMetricDAO, IPersi
H2SqlEntity entity = new H2SqlEntity();
Map<String, Object> source = new HashMap<>();
source.put(MemoryMetricTable.COLUMN_ID, data.getDataString(0));
source.put(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0));
source.put(MemoryMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryMetricTable.COLUMN_MAX, data.getDataLong(1));
......
......@@ -42,7 +42,7 @@ public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO, IPersist
}
@Override public H2SqlEntity prepareBatchInsert(Data data) {
logger.debug("segment cost prepareBatchInsert, id: {}", data.getDataString(0));
logger.debug("segment cost prepareBatchInsert, getId: {}", data.getDataString(0));
H2SqlEntity entity = new H2SqlEntity();
Map<String, Object> source = new HashMap<>();
source.put(SegmentCostTable.COLUMN_ID, data.getDataString(0));
......
......@@ -26,7 +26,7 @@ import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.table.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -49,12 +49,12 @@ public class ServiceNameH2DAO extends H2DAO implements IServiceNameDAO {
@Override
public void save(Data data) {
String id = ServiceNameDataDefine.ServiceName.INSTANCE.getId(data);
int applicationId = ServiceNameDataDefine.ServiceName.INSTANCE.getApplicationId(data);
int serviceId = ServiceNameDataDefine.ServiceName.INSTANCE.getServiceId(data);
String serviceName = ServiceNameDataDefine.ServiceName.INSTANCE.getServiceName(data);
String id = ServiceName.ServiceName.INSTANCE.getId(data);
int applicationId = ServiceName.ServiceName.INSTANCE.getApplicationId(data);
int serviceId = ServiceName.ServiceName.INSTANCE.getServiceId(data);
String serviceName = ServiceName.ServiceName.INSTANCE.getServiceName(data);
logger.debug("save service name register info, application id: {}, service name: {}", applicationId, serviceName);
logger.debug("save service name register info, application getId: {}, service name: {}", applicationId, serviceName);
H2Client client = getClient();
Map<String, Object> source = new HashMap<>();
source.put(ServiceNameTable.COLUMN_ID, id);
......
......@@ -33,7 +33,7 @@ public class MemoryMetricH2TableDefine extends H2TableDefine {
@Override public void initialize() {
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, H2ColumnDefine.Type.Boolean.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_INIT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_MAX, H2ColumnDefine.Type.Bigint.name()));
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public interface Aggregator<Input extends Data, Output extends Data> {
void process(Input input, Next<Output> next);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.framework.Executor;
/**
* @author peng-yongsheng
*/
public class Next<Input extends Data> implements Executor<Input> {
private final List<Node> nextNodes;
public Next() {
this.nextNodes = new ArrayList<>();
}
public final void addNext(Node node) {
nextNodes.add(node);
}
@Override public void execute(Input input) {
nextNodes.forEach(node -> node.execute(input));
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class Node<Input extends Data> {
private final Aggregator aggregator;
private final Next<Input> next;
public Node(Aggregator aggregator) {
this.aggregator = aggregator;
this.next = new Next<>();
}
public final Node addNext(Aggregator aggregator) {
Node node = new Node(aggregator);
next.addNext(node);
return node;
}
final void execute(Input input) {
aggregator.process(input, next);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public class StreamGraph {
private Node startNode;
public void start(Data input) {
startNode.execute(input);
}
public Node addNode(Aggregator aggregator) {
startNode = new Node(aggregator);
return startNode;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
import org.junit.Test;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.storage.table.register.Application;
/**
* @author peng-yongsheng
*/
public class StreamGraphTestCase {
@Test
public void test() {
StreamGraph graph = new StreamGraph();
graph.addNode(new Aggregator<InstPerformance, Application>() {
@Override public void process(InstPerformance performance, Next<Application> next) {
Application application = new Application("111");
next.execute(application);
}
}).addNext(new Aggregator<Application, InstPerformance>() {
@Override public void process(Application application, Next<InstPerformance> next) {
}
});
InstPerformance instPerformance = new InstPerformance("111");
graph.start(instPerformance);
}
}
......@@ -28,6 +28,8 @@ import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
/**
* @author peng-yongsheng
......@@ -46,9 +48,12 @@ public class StreamModuleProvider extends ModuleProvider {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
PersistenceTimer persistenceTimer = new PersistenceTimer();
try {
QueueCreatorService queueCreatorService = getManager().find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteClientService remoteClientService = getManager().find(RemoteModule.NAME).getService(RemoteClientService.class);
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
persistenceTimer.start(daoService);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
}
......
......@@ -22,10 +22,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.FlushAndSwitch;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorkerContainer;
......@@ -35,19 +34,19 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class PersistenceTimer implements Starter {
public class PersistenceTimer {
private final Logger logger = LoggerFactory.getLogger(PersistenceTimer.class);
public void start() {
public void start(DAOService daoService) {
logger.info("persistence timer start");
//TODO timer value config
// final long timeInterval = EsConfig.Es.Persistence.Timer.VALUE * 1000;
final long timeInterval = 3;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(), 1, timeInterval, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> extractDataAndSave(daoService), 1, timeInterval, TimeUnit.SECONDS);
}
private void extractDataAndSave() {
private void extractDataAndSave(DAOService daoService) {
try {
List<PersistenceWorker> workers = PersistenceWorkerContainer.INSTANCE.getPersistenceWorkers();
List batchAllCollection = new ArrayList<>();
......@@ -63,7 +62,7 @@ public class PersistenceTimer implements Starter {
}
});
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class);
dao.batchPersistence(batchAllCollection);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
......
......@@ -90,10 +90,10 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
protected final void aggregate(Object message) {
Data data = (Data)message;
dataCache.writing();
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(dataCache.get(data.id()), data);
if (dataCache.containsKey(data.getId())) {
getRole().dataDefine().mergeData(dataCache.get(data.getId()), data);
} else {
dataCache.put(data.id(), data);
dataCache.put(data.getId(), data);
}
dataCache.finishWriting();
}
......
......@@ -139,10 +139,10 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
dataCache.writing();
Data data = (Data)message;
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(dataCache.get(data.id()), data);
if (dataCache.containsKey(data.getId())) {
getRole().dataDefine().mergeData(dataCache.get(data.getId()), data);
} else {
dataCache.put(data.id(), data);
dataCache.put(data.getId(), data);
}
dataCache.finishWriting();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册