提交 b560ef1f 编写于 作者: P pengys5

Add memory metric record persistence, but not test.

#346
上级 a798bb64
package org.skywalking.apm.collector.agentstream.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
......@@ -15,6 +18,7 @@ import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -33,6 +37,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
});
responseObserver.onNext(Downstream.newBuilder().build());
......@@ -53,4 +58,28 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
logger.error(e.getMessage(), e);
}
}
private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<Memory> memories) {
for (int i = 0; i < memories.size(); i++) {
Memory memory = memories.get(i);
MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric();
memoryMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(i));
memoryMetric.setApplicationInstanceId(applicationInstanceId);
memoryMetric.setHeap(memory.getIsHeap());
memoryMetric.setInit(memory.getInit());
memoryMetric.setMax(memory.getMax());
memoryMetric.setUsed(memory.getUsed());
memoryMetric.setCommitted(memory.getCommitted());
memoryMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory metric persistence worker, id: {}", memoryMetric.getId());
context.getClusterWorkerContext().lookup(MemoryMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
}
......@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
......@@ -61,7 +61,7 @@ public class CpuMetricPersistenceWorker extends PersistenceWorker {
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
......
......@@ -8,5 +8,5 @@ import org.skywalking.apm.collector.agentstream.worker.CommonTable;
public class CpuMetricTable extends CommonTable {
public static final String TABLE = "cpu_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_USAGE_PERCENT = "application_instance_id";
public static final String COLUMN_USAGE_PERCENT = "usage_percent";
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao.IMemoryMetricDAO;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class MemoryMetricPersistenceWorker extends PersistenceWorker {
public MemoryMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IMemoryMetricDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public MemoryMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new MemoryMetricPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return MemoryMetricPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new MemoryMetricDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao;
/**
* @author pengys5
*/
public interface IMemoryMetricDAO {
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.MemoryMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class MemoryMetricEsDAO extends EsDAO implements IMemoryMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0));
source.put(MemoryMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
return getClient().prepareIndex(MemoryMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class MemoryMetricH2DAO extends H2DAO implements IMemoryMetricDAO {
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class MemoryMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 8;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(MemoryMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(MemoryMetricTable.COLUMN_IS_HEAP, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(3, new Attribute(MemoryMetricTable.COLUMN_INIT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(MemoryMetricTable.COLUMN_MAX, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(MemoryMetricTable.COLUMN_USED, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(MemoryMetricTable.COLUMN_COMMITTED, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(MemoryMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
public static class MemoryMetric implements Transform<MemoryMetric> {
private String id;
private int applicationInstanceId;
private boolean isHeap;
private long init;
private long max;
private long used;
private long committed;
private long timeBucket;
public MemoryMetric(String id, int applicationInstanceId, boolean isHeap, long init, long max, long used,
long committed, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.isHeap = isHeap;
this.init = init;
this.max = max;
this.used = used;
this.committed = committed;
this.timeBucket = timeBucket;
}
public MemoryMetric() {
}
@Override public Data toData() {
MemoryMetricDataDefine define = new MemoryMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataBoolean(0, this.isHeap);
data.setDataLong(0, this.init);
data.setDataLong(1, this.max);
data.setDataLong(2, this.used);
data.setDataLong(3, this.committed);
data.setDataLong(4, this.timeBucket);
return data;
}
@Override public MemoryMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.isHeap = data.getDataBoolean(0);
this.init = data.getDataLong(0);
this.max = data.getDataLong(1);
this.used = data.getDataLong(2);
this.committed = data.getDataLong(3);
this.timeBucket = data.getDataLong(4);
return this;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public void setHeap(boolean heap) {
isHeap = heap;
}
public void setInit(long init) {
this.init = init;
}
public void setMax(long max) {
this.max = max;
}
public void setUsed(long used) {
this.used = used;
}
public void setCommitted(long committed) {
this.committed = committed;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getId() {
return id;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public long getTimeBucket() {
return timeBucket;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class MemoryMetricEsTableDefine extends ElasticSearchTableDefine {
public MemoryMetricEsTableDefine() {
super(MemoryMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_INIT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_MAX, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_USED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_COMMITTED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class MemoryMetricH2TableDefine extends H2TableDefine {
public MemoryMetricH2TableDefine() {
super(MemoryMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, H2ColumnDefine.Type.Boolean.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_INIT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_MAX, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_USED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_COMMITTED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class MemoryMetricTable extends CommonTable {
public static final String TABLE = "memory_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_IS_HEAP = "is_heap";
public static final String COLUMN_INIT = "init";
public static final String COLUMN_MAX = "max";
public static final String COLUMN_USED = "used";
public static final String COLUMN_COMMITTED = "committed";
}
......@@ -10,4 +10,5 @@ org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDA
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao.MemoryMetricEsDAO
\ No newline at end of file
......@@ -10,4 +10,5 @@ org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DA
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.dao.MemoryMetricH2DAO
\ No newline at end of file
......@@ -21,6 +21,7 @@ org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersiste
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.MemoryMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
......
......@@ -35,4 +35,7 @@ org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.Serv
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefH2TableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricEsTableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricH2TableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.MemoryMetricEsTableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.gc.define.MemoryMetricH2TableDefine
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册