提交 ecc050c6 编写于 作者: P pengys5

Add gc metric record persistence, but not test

上级 b452cdd2
......@@ -5,6 +5,8 @@ import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.MemoryPoolMetricPersistenceWorker;
......@@ -18,6 +20,7 @@ import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
......@@ -42,6 +45,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList());
sendToGCMetricPersistenceWorker(context, applicationInstanceId, time, metric.getGcList());
});
responseObserver.onNext(Downstream.newBuilder().build());
......@@ -107,4 +111,23 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
}
});
}
private void sendToGCMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> {
GCMetricDataDefine.GCMetric gcMetric = new GCMetricDataDefine.GCMetric();
gcMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue()));
gcMetric.setApplicationInstanceId(applicationInstanceId);
gcMetric.setPhrase(gc.getPhraseValue());
gcMetric.setCount(gc.getCount());
gcMetric.setTime(gc.getTime());
gcMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId());
context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
}
......@@ -22,7 +22,7 @@ public class CpuMetricDataDefine extends DataDefine {
addAttribute(0, new Attribute(CpuMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(CpuMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(CpuMetricTable.COLUMN_USAGE_PERCENT, AttributeType.DOUBLE, new CoverOperation()));
addAttribute(2, new Attribute(CpuMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
addAttribute(3, new Attribute(CpuMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
......
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao.IGCMetricDAO;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class GCMetricPersistenceWorker extends PersistenceWorker {
public GCMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IGCMetricDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GCMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public GCMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new GCMetricPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return GCMetricPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new GCMetricDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0));
source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1));
source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2));
return getClient().prepareIndex(GCMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO {
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao;
/**
* @author pengys5
*/
public interface IGCMetricDAO {
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class GCMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 6;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(GCMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(GCMetricTable.COLUMN_PHRASE, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(GCMetricTable.COLUMN_COUNT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(GCMetricTable.COLUMN_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(GCMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
public static class GCMetric implements Transform<GCMetric> {
private String id;
private int applicationInstanceId;
private int phrase;
private long count;
private long time;
private long timeBucket;
public GCMetric(String id, int applicationInstanceId, int phrase, long count, long time, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.phrase = phrase;
this.count = count;
this.time = time;
this.timeBucket = timeBucket;
}
public GCMetric() {
}
@Override public Data toData() {
GCMetricDataDefine define = new GCMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataInteger(1, this.phrase);
data.setDataLong(0, this.count);
data.setDataLong(1, this.time);
data.setDataLong(2, this.timeBucket);
return data;
}
@Override public GCMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.phrase = data.getDataInteger(1);
this.count = data.getDataLong(0);
this.time = data.getDataLong(1);
this.timeBucket = data.getDataLong(2);
return this;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getId() {
return id;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public long getTimeBucket() {
return timeBucket;
}
public int getPhrase() {
return phrase;
}
public long getCount() {
return count;
}
public long getTime() {
return time;
}
public void setPhrase(int phrase) {
this.phrase = phrase;
}
public void setCount(long count) {
this.count = count;
}
public void setTime(long time) {
this.time = time;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class GCMetricEsTableDefine extends ElasticSearchTableDefine {
public GCMetricEsTableDefine() {
super(GCMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_PHRASE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_COUNT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class GCMetricH2TableDefine extends H2TableDefine {
public GCMetricH2TableDefine() {
super(GCMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_PHRASE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_COUNT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class GCMetricTable extends CommonTable {
public static final String TABLE = "gc_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_PHRASE = "phrase";
public static final String COLUMN_COUNT = "count";
public static final String COLUMN_TIME = "time";
}
......@@ -12,4 +12,5 @@ org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEs
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.dao.MemoryMetricEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao.MemoryPoolMetricEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao.MemoryPoolMetricEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao.GCMetricEsDAO
\ No newline at end of file
......@@ -12,4 +12,5 @@ org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.dao.MemoryMetricH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao.MemoryPoolMetricH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao.MemoryPoolMetricH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao.GCMetricH2DAO
\ No newline at end of file
......@@ -23,6 +23,7 @@ org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWor
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.MemoryMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.MemoryPoolMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.GCMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
......
......@@ -41,4 +41,7 @@ org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.define.MemoryMe
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.define.MemoryMetricH2TableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricEsTableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricH2TableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.GCMetricEsTableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.GCMetricH2TableDefine
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册