提交 b452cdd2 编写于 作者: P pengys5

Add memory pool metric record persistence, but not test.

上级 e6907b28
......@@ -7,6 +7,8 @@ import org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPe
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
......@@ -19,6 +21,7 @@ import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,6 +41,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList());
});
responseObserver.onNext(Downstream.newBuilder().build());
......@@ -62,11 +66,9 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<Memory> memories) {
for (int i = 0; i < memories.size(); i++) {
Memory memory = memories.get(i);
memories.forEach(memory -> {
MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric();
memoryMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(i));
memoryMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap()));
memoryMetric.setApplicationInstanceId(applicationInstanceId);
memoryMetric.setHeap(memory.getIsHeap());
memoryMetric.setInit(memory.getInit());
......@@ -80,6 +82,29 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
});
}
private void sendToMemoryPoolMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<MemoryPool> memoryPools) {
memoryPools.forEach(memoryPool -> {
MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric();
memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
memoryPoolMetric.setApplicationInstanceId(applicationInstanceId);
memoryPoolMetric.setPoolType(memoryPool.getType().getNumber());
memoryPoolMetric.setHeap(memoryPool.getIsHeap());
memoryPoolMetric.setInit(memoryPool.getInit());
memoryPoolMetric.setMax(memoryPool.getMax());
memoryPoolMetric.setUsed(memoryPool.getUsed());
memoryPoolMetric.setCommitted(memoryPool.getCommited());
memoryPoolMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory pool metric persistence worker, id: {}", memoryPoolMetric.getId());
context.getClusterWorkerContext().lookup(MemoryPoolMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryPoolMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao.IMemoryPoolMetricDAO;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker {
public MemoryPoolMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IMemoryPoolMetricDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryPoolMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public MemoryPoolMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new MemoryPoolMetricPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return MemoryPoolMetricPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new MemoryPoolMetricDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao;
/**
* @author pengys5
*/
public interface IMemoryPoolMetricDAO {
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class MemoryPoolMetricEsDAO extends EsDAO implements IMemoryPoolMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryPoolMetricTable.COLUMN_POOL_TYPE, data.getDataInteger(1));
source.put(MemoryPoolMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0));
source.put(MemoryPoolMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryPoolMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryPoolMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryPoolMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
return getClient().prepareIndex(MemoryPoolMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class MemoryPoolMetricH2DAO extends H2DAO implements IMemoryPoolMetricDAO {
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class MemoryPoolMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 9;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(MemoryPoolMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(MemoryPoolMetricTable.COLUMN_POOL_TYPE, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(MemoryPoolMetricTable.COLUMN_IS_HEAP, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(4, new Attribute(MemoryPoolMetricTable.COLUMN_INIT, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(MemoryPoolMetricTable.COLUMN_MAX, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(MemoryPoolMetricTable.COLUMN_USED, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(MemoryPoolMetricTable.COLUMN_COMMITTED, AttributeType.LONG, new CoverOperation()));
addAttribute(8, new Attribute(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
public static class MemoryPoolMetric implements Transform<MemoryPoolMetric> {
private String id;
private int applicationInstanceId;
private int poolType;
private boolean isHeap;
private long init;
private long max;
private long used;
private long committed;
private long timeBucket;
public MemoryPoolMetric(String id, int applicationInstanceId, int poolType, boolean isHeap, long init, long max,
long used, long committed, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.poolType = poolType;
this.isHeap = isHeap;
this.init = init;
this.max = max;
this.used = used;
this.committed = committed;
this.timeBucket = timeBucket;
}
public MemoryPoolMetric() {
}
@Override public Data toData() {
MemoryPoolMetricDataDefine define = new MemoryPoolMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataInteger(1, this.poolType);
data.setDataBoolean(0, this.isHeap);
data.setDataLong(0, this.init);
data.setDataLong(1, this.max);
data.setDataLong(2, this.used);
data.setDataLong(3, this.committed);
data.setDataLong(4, this.timeBucket);
return data;
}
@Override public MemoryPoolMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.poolType = data.getDataInteger(1);
this.isHeap = data.getDataBoolean(0);
this.init = data.getDataLong(0);
this.max = data.getDataLong(1);
this.used = data.getDataLong(2);
this.committed = data.getDataLong(3);
this.timeBucket = data.getDataLong(4);
return this;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public void setPoolType(int poolType) {
this.poolType = poolType;
}
public void setHeap(boolean heap) {
isHeap = heap;
}
public void setInit(long init) {
this.init = init;
}
public void setMax(long max) {
this.max = max;
}
public void setUsed(long used) {
this.used = used;
}
public void setCommitted(long committed) {
this.committed = committed;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getId() {
return id;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public long getTimeBucket() {
return timeBucket;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class MemoryPoolMetricEsTableDefine extends ElasticSearchTableDefine {
public MemoryPoolMetricEsTableDefine() {
super(MemoryPoolMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_POOL_TYPE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_INIT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_MAX, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_USED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_COMMITTED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class MemoryPoolMetricH2TableDefine extends H2TableDefine {
public MemoryPoolMetricH2TableDefine() {
super(MemoryPoolMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_POOL_TYPE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_IS_HEAP, H2ColumnDefine.Type.Boolean.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_INIT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_MAX, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_USED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_COMMITTED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class MemoryPoolMetricTable extends CommonTable {
public static final String TABLE = "memory_pool_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_POOL_TYPE = "pool_type";
public static final String COLUMN_IS_HEAP = "is_heap";
public static final String COLUMN_INIT = "init";
public static final String COLUMN_MAX = "max";
public static final String COLUMN_USED = "used";
public static final String COLUMN_COMMITTED = "committed";
}
......@@ -11,4 +11,5 @@ org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.dao.MemoryMetricEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.dao.MemoryMetricEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao.MemoryPoolMetricEsDAO
\ No newline at end of file
......@@ -11,4 +11,5 @@ org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.dao.MemoryMetricH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.dao.MemoryMetricH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.dao.MemoryPoolMetricH2DAO
\ No newline at end of file
......@@ -22,6 +22,7 @@ org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWor
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.MemoryMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.MemoryPoolMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
......
......@@ -38,4 +38,7 @@ org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricEs
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricH2TableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.define.MemoryMetricEsTableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.define.MemoryMetricH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memory.define.MemoryMetricH2TableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricEsTableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.memorypool.define.MemoryPoolMetricH2TableDefine
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册