提交 933b2585 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge branch 'master' into zhangxin/feature/support-rest-template

......@@ -13,6 +13,11 @@
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
......@@ -25,7 +30,7 @@
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<artifactId>apm-collector-storage</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
package org.skywalking.apm.collector.agentjvm.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
super.collect(request, responseObserver);
int applicationInstanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", applicationInstanceId);
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList());
sendToGCMetricPersistenceWorker(context, applicationInstanceId, time, metric.getGcList());
});
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, CPU cpu) {
CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric();
cpuMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId);
cpuMetric.setApplicationInstanceId(applicationInstanceId);
cpuMetric.setUsagePercent(cpu.getUsagePercent());
cpuMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to cpu metric persistence worker, id: {}", cpuMetric.getId());
context.getClusterWorkerContext().lookup(CpuMetricPersistenceWorker.WorkerRole.INSTANCE).tell(cpuMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<Memory> memories) {
memories.forEach(memory -> {
MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric();
memoryMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap()));
memoryMetric.setApplicationInstanceId(applicationInstanceId);
memoryMetric.setHeap(memory.getIsHeap());
memoryMetric.setInit(memory.getInit());
memoryMetric.setMax(memory.getMax());
memoryMetric.setUsed(memory.getUsed());
memoryMetric.setCommitted(memory.getCommitted());
memoryMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory metric persistence worker, id: {}", memoryMetric.getId());
context.getClusterWorkerContext().lookup(MemoryMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
private void sendToMemoryPoolMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<MemoryPool> memoryPools) {
memoryPools.forEach(memoryPool -> {
MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric();
memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
memoryPoolMetric.setApplicationInstanceId(applicationInstanceId);
memoryPoolMetric.setPoolType(memoryPool.getType().getNumber());
memoryPoolMetric.setHeap(memoryPool.getIsHeap());
memoryPoolMetric.setInit(memoryPool.getInit());
memoryPoolMetric.setMax(memoryPool.getMax());
memoryPoolMetric.setUsed(memoryPool.getUsed());
memoryPoolMetric.setCommitted(memoryPool.getCommited());
memoryPoolMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory pool metric persistence worker, id: {}", memoryPoolMetric.getId());
context.getClusterWorkerContext().lookup(MemoryPoolMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryPoolMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
private void sendToGCMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId,
long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> {
GCMetricDataDefine.GCMetric gcMetric = new GCMetricDataDefine.GCMetric();
gcMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue()));
gcMetric.setApplicationInstanceId(applicationInstanceId);
gcMetric.setPhrase(gc.getPhraseValue());
gcMetric.setCount(gc.getCount());
gcMetric.setTime(gc.getTime());
gcMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId());
context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
}
package org.skywalking.apm.collector.agentjvm.worker.cpu;
import org.skywalking.apm.collector.agentjvm.worker.cpu.dao.ICpuMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class CpuMetricPersistenceWorker extends PersistenceWorker {
public CpuMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(ICpuMetricDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<CpuMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public CpuMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new CpuMetricPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return CpuMetricPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new CpuMetricDataDefine();
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.cpu.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(CpuMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0));
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
return getClient().prepareIndex(CpuMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentjvm.worker.cpu.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.cpu.dao;
/**
* @author pengys5
*/
public interface ICpuMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.cpu.define;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class CpuMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(CpuMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(CpuMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(CpuMetricTable.COLUMN_USAGE_PERCENT, AttributeType.DOUBLE, new CoverOperation()));
addAttribute(3, new Attribute(CpuMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
throw new UnexpectedException("cpu metric data did not need send to remote worker.");
}
@Override public RemoteData serialize(Object object) {
throw new UnexpectedException("cpu metric data did not need send to remote worker.");
}
public static class CpuMetric implements Transform<CpuMetric> {
private String id;
private int applicationInstanceId;
private double usagePercent;
private long timeBucket;
public CpuMetric(String id, int applicationInstanceId, double usagePercent, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.usagePercent = usagePercent;
this.timeBucket = timeBucket;
}
public CpuMetric() {
}
@Override public Data toData() {
CpuMetricDataDefine define = new CpuMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataDouble(0, this.usagePercent);
data.setDataLong(0, this.timeBucket);
return data;
}
@Override public CpuMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.usagePercent = data.getDataDouble(0);
this.timeBucket = data.getDataLong(0);
return this;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public void setUsagePercent(double usagePercent) {
this.usagePercent = usagePercent;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getId() {
return id;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public double getUsagePercent() {
return usagePercent;
}
public long getTimeBucket() {
return timeBucket;
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.cpu.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class CpuMetricEsTableDefine extends ElasticSearchTableDefine {
public CpuMetricEsTableDefine() {
super(CpuMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_USAGE_PERCENT, ElasticSearchColumnDefine.Type.Double.name()));
addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.cpu.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class CpuMetricH2TableDefine extends H2TableDefine {
public CpuMetricH2TableDefine() {
super(CpuMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(CpuMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(CpuMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(CpuMetricTable.COLUMN_USAGE_PERCENT, H2ColumnDefine.Type.Double.name()));
addColumn(new H2ColumnDefine(CpuMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.cpu.define;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
*/
public class CpuMetricTable extends CommonTable {
public static final String TABLE = "cpu_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_USAGE_PERCENT = "usage_percent";
}
package org.skywalking.apm.collector.agentjvm.worker.gc;
import org.skywalking.apm.collector.agentjvm.worker.gc.dao.IGCMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class GCMetricPersistenceWorker extends PersistenceWorker {
public GCMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IGCMetricDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GCMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public GCMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new GCMetricPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return GCMetricPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new GCMetricDataDefine();
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.gc.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0));
source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1));
source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2));
return getClient().prepareIndex(GCMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentjvm.worker.gc.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.gc.dao;
/**
* @author pengys5
*/
public interface IGCMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.gc.define;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class GCMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 6;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(GCMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(GCMetricTable.COLUMN_PHRASE, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(GCMetricTable.COLUMN_COUNT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(GCMetricTable.COLUMN_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(GCMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
throw new UnexpectedException("gc metric data did not need send to remote worker.");
}
@Override public RemoteData serialize(Object object) {
throw new UnexpectedException("gc metric data did not need send to remote worker.");
}
public static class GCMetric implements Transform<GCMetric> {
private String id;
private int applicationInstanceId;
private int phrase;
private long count;
private long time;
private long timeBucket;
public GCMetric(String id, int applicationInstanceId, int phrase, long count, long time, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.phrase = phrase;
this.count = count;
this.time = time;
this.timeBucket = timeBucket;
}
public GCMetric() {
}
@Override public Data toData() {
GCMetricDataDefine define = new GCMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataInteger(1, this.phrase);
data.setDataLong(0, this.count);
data.setDataLong(1, this.time);
data.setDataLong(2, this.timeBucket);
return data;
}
@Override public GCMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.phrase = data.getDataInteger(1);
this.count = data.getDataLong(0);
this.time = data.getDataLong(1);
this.timeBucket = data.getDataLong(2);
return this;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getId() {
return id;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public long getTimeBucket() {
return timeBucket;
}
public int getPhrase() {
return phrase;
}
public long getCount() {
return count;
}
public long getTime() {
return time;
}
public void setPhrase(int phrase) {
this.phrase = phrase;
}
public void setCount(long count) {
this.count = count;
}
public void setTime(long time) {
this.time = time;
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.gc.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class GCMetricEsTableDefine extends ElasticSearchTableDefine {
public GCMetricEsTableDefine() {
super(GCMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_PHRASE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_COUNT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.gc.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class GCMetricH2TableDefine extends H2TableDefine {
public GCMetricH2TableDefine() {
super(GCMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_PHRASE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_COUNT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.gc.define;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
*/
public class GCMetricTable extends CommonTable {
public static final String TABLE = "gc_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_PHRASE = "phrase";
public static final String COLUMN_COUNT = "count";
public static final String COLUMN_TIME = "time";
}
package org.skywalking.apm.collector.agentjvm.worker.memory;
import org.skywalking.apm.collector.agentjvm.worker.memory.dao.IMemoryMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class MemoryMetricPersistenceWorker extends PersistenceWorker {
public MemoryMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IMemoryMetricDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public MemoryMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new MemoryMetricPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return MemoryMetricPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new MemoryMetricDataDefine();
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.memory.dao;
/**
* @author pengys5
*/
public interface IMemoryMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.memory.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class MemoryMetricEsDAO extends EsDAO implements IMemoryMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0));
source.put(MemoryMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
return getClient().prepareIndex(MemoryMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentjvm.worker.memory.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class MemoryMetricH2DAO extends H2DAO implements IMemoryMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.memory.define;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class MemoryMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 8;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(MemoryMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(MemoryMetricTable.COLUMN_IS_HEAP, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(3, new Attribute(MemoryMetricTable.COLUMN_INIT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(MemoryMetricTable.COLUMN_MAX, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(MemoryMetricTable.COLUMN_USED, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(MemoryMetricTable.COLUMN_COMMITTED, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(MemoryMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
throw new UnexpectedException("memory metric data did not need send to remote worker.");
}
@Override public RemoteData serialize(Object object) {
throw new UnexpectedException("memory metric data did not need send to remote worker.");
}
public static class MemoryMetric implements Transform<MemoryMetric> {
private String id;
private int applicationInstanceId;
private boolean isHeap;
private long init;
private long max;
private long used;
private long committed;
private long timeBucket;
public MemoryMetric(String id, int applicationInstanceId, boolean isHeap, long init, long max, long used,
long committed, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.isHeap = isHeap;
this.init = init;
this.max = max;
this.used = used;
this.committed = committed;
this.timeBucket = timeBucket;
}
public MemoryMetric() {
}
@Override public Data toData() {
MemoryMetricDataDefine define = new MemoryMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataBoolean(0, this.isHeap);
data.setDataLong(0, this.init);
data.setDataLong(1, this.max);
data.setDataLong(2, this.used);
data.setDataLong(3, this.committed);
data.setDataLong(4, this.timeBucket);
return data;
}
@Override public MemoryMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.isHeap = data.getDataBoolean(0);
this.init = data.getDataLong(0);
this.max = data.getDataLong(1);
this.used = data.getDataLong(2);
this.committed = data.getDataLong(3);
this.timeBucket = data.getDataLong(4);
return this;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public void setHeap(boolean heap) {
isHeap = heap;
}
public void setInit(long init) {
this.init = init;
}
public void setMax(long max) {
this.max = max;
}
public void setUsed(long used) {
this.used = used;
}
public void setCommitted(long committed) {
this.committed = committed;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getId() {
return id;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public long getTimeBucket() {
return timeBucket;
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.memory.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class MemoryMetricEsTableDefine extends ElasticSearchTableDefine {
public MemoryMetricEsTableDefine() {
super(MemoryMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_INIT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_MAX, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_USED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_COMMITTED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.memory.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class MemoryMetricH2TableDefine extends H2TableDefine {
public MemoryMetricH2TableDefine() {
super(MemoryMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, H2ColumnDefine.Type.Boolean.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_INIT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_MAX, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_USED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_COMMITTED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.memory.define;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
*/
public class MemoryMetricTable extends CommonTable {
public static final String TABLE = "memory_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_IS_HEAP = "is_heap";
public static final String COLUMN_INIT = "init";
public static final String COLUMN_MAX = "max";
public static final String COLUMN_USED = "used";
public static final String COLUMN_COMMITTED = "committed";
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.IMemoryPoolMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class MemoryPoolMetricPersistenceWorker extends PersistenceWorker {
public MemoryPoolMetricPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IMemoryPoolMetricDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<MemoryPoolMetricPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public MemoryPoolMetricPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new MemoryPoolMetricPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return MemoryPoolMetricPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new MemoryPoolMetricDataDefine();
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.dao;
/**
* @author pengys5
*/
public interface IMemoryPoolMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class MemoryPoolMetricEsDAO extends EsDAO implements IMemoryPoolMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryPoolMetricTable.COLUMN_POOL_TYPE, data.getDataInteger(1));
source.put(MemoryPoolMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0));
source.put(MemoryPoolMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryPoolMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryPoolMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryPoolMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
return getClient().prepareIndex(MemoryPoolMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class MemoryPoolMetricH2DAO extends H2DAO implements IMemoryPoolMetricDAO {
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.define;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class MemoryPoolMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() {
return 9;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(MemoryPoolMetricTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(MemoryPoolMetricTable.COLUMN_POOL_TYPE, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(MemoryPoolMetricTable.COLUMN_IS_HEAP, AttributeType.BOOLEAN, new CoverOperation()));
addAttribute(4, new Attribute(MemoryPoolMetricTable.COLUMN_INIT, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(MemoryPoolMetricTable.COLUMN_MAX, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(MemoryPoolMetricTable.COLUMN_USED, AttributeType.LONG, new CoverOperation()));
addAttribute(7, new Attribute(MemoryPoolMetricTable.COLUMN_COMMITTED, AttributeType.LONG, new CoverOperation()));
addAttribute(8, new Attribute(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
throw new UnexpectedException("memory pool metric data did not need send to remote worker.");
}
@Override public RemoteData serialize(Object object) {
throw new UnexpectedException("memory pool metric data did not need send to remote worker.");
}
public static class MemoryPoolMetric implements Transform<MemoryPoolMetric> {
private String id;
private int applicationInstanceId;
private int poolType;
private boolean isHeap;
private long init;
private long max;
private long used;
private long committed;
private long timeBucket;
public MemoryPoolMetric(String id, int applicationInstanceId, int poolType, boolean isHeap, long init, long max,
long used, long committed, long timeBucket) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.poolType = poolType;
this.isHeap = isHeap;
this.init = init;
this.max = max;
this.used = used;
this.committed = committed;
this.timeBucket = timeBucket;
}
public MemoryPoolMetric() {
}
@Override public Data toData() {
MemoryPoolMetricDataDefine define = new MemoryPoolMetricDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataInteger(1, this.poolType);
data.setDataBoolean(0, this.isHeap);
data.setDataLong(0, this.init);
data.setDataLong(1, this.max);
data.setDataLong(2, this.used);
data.setDataLong(3, this.committed);
data.setDataLong(4, this.timeBucket);
return data;
}
@Override public MemoryPoolMetric toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.poolType = data.getDataInteger(1);
this.isHeap = data.getDataBoolean(0);
this.init = data.getDataLong(0);
this.max = data.getDataLong(1);
this.used = data.getDataLong(2);
this.committed = data.getDataLong(3);
this.timeBucket = data.getDataLong(4);
return this;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}
public void setPoolType(int poolType) {
this.poolType = poolType;
}
public void setHeap(boolean heap) {
isHeap = heap;
}
public void setInit(long init) {
this.init = init;
}
public void setMax(long max) {
this.max = max;
}
public void setUsed(long used) {
this.used = used;
}
public void setCommitted(long committed) {
this.committed = committed;
}
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
public String getId() {
return id;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
}
public long getTimeBucket() {
return timeBucket;
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class MemoryPoolMetricEsTableDefine extends ElasticSearchTableDefine {
public MemoryPoolMetricEsTableDefine() {
super(MemoryPoolMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_POOL_TYPE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_INIT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_MAX, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_USED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_COMMITTED, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class MemoryPoolMetricH2TableDefine extends H2TableDefine {
public MemoryPoolMetricH2TableDefine() {
super(MemoryPoolMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_APPLICATION_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_POOL_TYPE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_IS_HEAP, H2ColumnDefine.Type.Boolean.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_INIT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_MAX, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_USED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_COMMITTED, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.define;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
*/
public class MemoryPoolMetricTable extends CommonTable {
public static final String TABLE = "memory_pool_metric";
public static final String COLUMN_APPLICATION_INSTANCE_ID = "application_instance_id";
public static final String COLUMN_POOL_TYPE = "pool_type";
public static final String COLUMN_IS_HEAP = "is_heap";
public static final String COLUMN_INIT = "init";
public static final String COLUMN_MAX = "max";
public static final String COLUMN_USED = "used";
public static final String COLUMN_COMMITTED = "committed";
}
org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricEsTableDefine
org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricH2TableDefine
org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricEsTableDefine
org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricH2TableDefine
org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricEsTableDefine
org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricH2TableDefine
org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricEsTableDefine
org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricH2TableDefine
\ No newline at end of file
package org.skywalking.apm.collector.agentjvm.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.GCPhrase;
import org.skywalking.apm.network.proto.JVMMetric;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.skywalking.apm.network.proto.PoolType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class JVMMetricsServiceHandlerTestCase {
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandlerTestCase.class);
private JVMMetricsServiceGrpc.JVMMetricsServiceBlockingStub stub;
public void test() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = JVMMetricsServiceGrpc.newBlockingStub(channel);
JVMMetrics.Builder jvmMetricsBuilder = JVMMetrics.newBuilder();
jvmMetricsBuilder.setApplicationInstanceId(1);
JVMMetric.Builder jvmMetric = JVMMetric.newBuilder();
jvmMetric.setTime(System.currentTimeMillis());
buildCpuMetric(jvmMetric);
buildMemoryMetric(jvmMetric);
buildMemoryPoolMetric(jvmMetric);
buildGcMetric(jvmMetric);
jvmMetricsBuilder.addMetrics(jvmMetric.build());
stub.collect(jvmMetricsBuilder.build());
}
private void buildCpuMetric(JVMMetric.Builder jvmMetric) {
CPU.Builder cpuBuilder = CPU.newBuilder();
cpuBuilder.setUsagePercent(70);
jvmMetric.setCpu(cpuBuilder);
}
private void buildMemoryMetric(JVMMetric.Builder jvmMetric) {
Memory.Builder builder_1 = Memory.newBuilder();
builder_1.setIsHeap(true);
builder_1.setInit(20);
builder_1.setMax(100);
builder_1.setUsed(50);
builder_1.setCommitted(30);
jvmMetric.addMemory(builder_1.build());
Memory.Builder builder_2 = Memory.newBuilder();
builder_2.setIsHeap(false);
builder_2.setInit(200);
builder_2.setMax(1000);
builder_2.setUsed(500);
builder_2.setCommitted(300);
jvmMetric.addMemory(builder_2.build());
}
private void buildMemoryPoolMetric(JVMMetric.Builder jvmMetric) {
MemoryPool.Builder builder_1 = MemoryPool.newBuilder();
builder_1.setType(PoolType.NEWGEN_USAGE);
builder_1.setIsHeap(true);
builder_1.setInit(20);
builder_1.setMax(100);
builder_1.setUsed(50);
builder_1.setCommited(30);
jvmMetric.addMemoryPool(builder_1.build());
}
private void buildGcMetric(JVMMetric.Builder jvmMetric) {
GC.Builder gcBuilder = GC.newBuilder();
gcBuilder.setPhrase(GCPhrase.NEW);
gcBuilder.setCount(2);
gcBuilder.setTime(100);
jvmMetric.addGc(gcBuilder.build());
}
}
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......
......@@ -5,7 +5,7 @@ import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
......@@ -2,14 +2,14 @@ package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
......@@ -2,12 +2,12 @@ package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.node.mapping.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
......@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
......@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.segment.origin.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.service.entry;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.service.entry.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.serviceref.reference;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
......@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
import org.skywalking.apm.collector.stream.worker.storage.CommonTable;
/**
* @author pengys5
......
......@@ -9,4 +9,8 @@ org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricH2DAO
\ No newline at end of file
......@@ -20,6 +20,11 @@ org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenc
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterSerialWorker$Factory
\ No newline at end of file
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public class UnexpectedException extends RuntimeException {
public UnexpectedException(String message) {
super(message);
}
}
......@@ -11,6 +11,6 @@ public class ElasticSearchColumnDefine extends ColumnDefine {
}
public enum Type {
Binary, Boolean, Date, Keyword, Long, Integer
Binary, Boolean, Keyword, Long, Integer, Double
}
}
......@@ -12,6 +12,6 @@ public class H2ColumnDefine extends ColumnDefine {
}
public enum Type {
Boolean, Varchar, Int, Bigint, BINARY
Boolean, Varchar, Int, Bigint, BINARY, Double
}
}
......@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
* @author pengys5
*/
public enum AttributeType {
STRING, LONG, FLOAT, INTEGER, BYTE, BOOLEAN
STRING, LONG, DOUBLE, INTEGER, BYTE, BOOLEAN
}
......@@ -10,29 +10,29 @@ import org.skywalking.apm.collector.stream.worker.selector.AbstractHashMessage;
public class Data extends AbstractHashMessage {
private final int stringCapacity;
private final int longCapacity;
private final int floatCapacity;
private final int doubleCapacity;
private final int integerCapacity;
private final int booleanCapacity;
private final int byteCapacity;
private String[] dataStrings;
private Long[] dataLongs;
private Float[] dataFloats;
private Double[] dataDoubles;
private Integer[] dataIntegers;
private Boolean[] dataBooleans;
private byte[][] dataBytes;
public Data(String id, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity,
public Data(String id, int stringCapacity, int longCapacity, int doubleCapacity, int integerCapacity,
int booleanCapacity, int byteCapacity) {
super(id);
this.dataStrings = new String[stringCapacity];
this.dataLongs = new Long[longCapacity];
this.dataFloats = new Float[floatCapacity];
this.dataDoubles = new Double[doubleCapacity];
this.dataIntegers = new Integer[integerCapacity];
this.dataBooleans = new Boolean[booleanCapacity];
this.dataBytes = new byte[byteCapacity][];
this.stringCapacity = stringCapacity;
this.longCapacity = longCapacity;
this.floatCapacity = floatCapacity;
this.doubleCapacity = doubleCapacity;
this.integerCapacity = integerCapacity;
this.booleanCapacity = booleanCapacity;
this.byteCapacity = byteCapacity;
......@@ -46,8 +46,8 @@ public class Data extends AbstractHashMessage {
dataLongs[position] = value;
}
public void setDataFloat(int position, Float value) {
dataFloats[position] = value;
public void setDataDouble(int position, Double value) {
dataDoubles[position] = value;
}
public void setDataInteger(int position, Integer value) {
......@@ -70,8 +70,8 @@ public class Data extends AbstractHashMessage {
return dataLongs[position];
}
public Float getDataFloat(int position) {
return dataFloats[position];
public Double getDataDouble(int position) {
return dataDoubles[position];
}
public Integer getDataInteger(int position) {
......@@ -93,7 +93,7 @@ public class Data extends AbstractHashMessage {
public RemoteData serialize() {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.setIntegerCapacity(integerCapacity);
builder.setFloatCapacity(floatCapacity);
builder.setDoubleCapacity(doubleCapacity);
builder.setStringCapacity(stringCapacity);
builder.setLongCapacity(longCapacity);
builder.setByteCapacity(byteCapacity);
......@@ -105,8 +105,8 @@ public class Data extends AbstractHashMessage {
for (int i = 0; i < dataIntegers.length; i++) {
builder.setDataIntegers(i, dataIntegers[i]);
}
for (int i = 0; i < dataFloats.length; i++) {
builder.setDataFloats(i, dataFloats[i]);
for (int i = 0; i < dataDoubles.length; i++) {
builder.setDataDoubles(i, dataDoubles[i]);
}
for (int i = 0; i < dataLongs.length; i++) {
builder.setDataLongs(i, dataLongs[i]);
......
......@@ -9,7 +9,7 @@ public abstract class DataDefine {
private Attribute[] attributes;
private int stringCapacity;
private int longCapacity;
private int floatCapacity;
private int doubleCapacity;
private int integerCapacity;
private int booleanCapacity;
private int byteCapacity;
......@@ -26,8 +26,8 @@ public abstract class DataDefine {
stringCapacity++;
} else if (AttributeType.LONG.equals(attribute.getType())) {
longCapacity++;
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
floatCapacity++;
} else if (AttributeType.DOUBLE.equals(attribute.getType())) {
doubleCapacity++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
integerCapacity++;
} else if (AttributeType.BOOLEAN.equals(attribute.getType())) {
......@@ -47,13 +47,13 @@ public abstract class DataDefine {
protected abstract void attributeDefine();
public final Data build(String id) {
return new Data(id, stringCapacity, longCapacity, floatCapacity, integerCapacity, booleanCapacity, byteCapacity);
return new Data(id, stringCapacity, longCapacity, doubleCapacity, integerCapacity, booleanCapacity, byteCapacity);
}
public void mergeData(Data newData, Data oldData) {
int stringPosition = 0;
int longPosition = 0;
int floatPosition = 0;
int doublePosition = 0;
int integerPosition = 0;
int booleanPosition = 0;
int bytePosition = 0;
......@@ -65,9 +65,9 @@ public abstract class DataDefine {
} else if (AttributeType.LONG.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataLong(longPosition), oldData.getDataLong(longPosition));
longPosition++;
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataFloat(floatPosition), oldData.getDataFloat(floatPosition));
floatPosition++;
} else if (AttributeType.DOUBLE.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataDouble(doublePosition), oldData.getDataDouble(doublePosition));
doublePosition++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataInteger(integerPosition), oldData.getDataInteger(integerPosition));
integerPosition++;
......
......@@ -8,7 +8,7 @@ public interface Operation {
Long operate(Long newValue, Long oldValue);
Float operate(Float newValue, Float oldValue);
Double operate(Double newValue, Double oldValue);
Integer operate(Integer newValue, Integer oldValue);
......
......@@ -15,7 +15,7 @@ public class AddOperation implements Operation {
return newValue + oldValue;
}
@Override public Float operate(Float newValue, Float oldValue) {
@Override public Double operate(Double newValue, Double oldValue) {
return newValue + oldValue;
}
......
......@@ -14,7 +14,7 @@ public class CoverOperation implements Operation {
return newValue;
}
@Override public Float operate(Float newValue, Float oldValue) {
@Override public Double operate(Double newValue, Double oldValue) {
return newValue;
}
......
......@@ -14,7 +14,7 @@ public class NonOperation implements Operation {
return oldValue;
}
@Override public Float operate(Float newValue, Float oldValue) {
@Override public Double operate(Double newValue, Double oldValue) {
return oldValue;
}
......
package org.skywalking.apm.collector.agentstream.worker;
package org.skywalking.apm.collector.stream.worker.util;
/**
* @author pengys5
......@@ -6,8 +6,6 @@ package org.skywalking.apm.collector.agentstream.worker;
public class Const {
public static final String ID_SPLIT = "..-..";
public static final String IDS_SPLIT = "\\.\\.-\\.\\.";
public static final String PEERS_FRONT_SPLIT = "[";
public static final String PEERS_BEHIND_SPLIT = "]";
public static final int USER_ID = 1;
public static final String USER_CODE = "User";
public static final String SEGMENT_SPAN_SPLIT = "S";
......
package org.skywalking.apm.collector.agentstream.worker.util;
package org.skywalking.apm.collector.stream.worker.util;
import java.text.SimpleDateFormat;
import java.util.Calendar;
......@@ -13,6 +13,7 @@ public enum TimeBucketUtils {
private final SimpleDateFormat dayDateFormat = new SimpleDateFormat("yyyyMMdd");
private final SimpleDateFormat hourDateFormat = new SimpleDateFormat("yyyyMMddHH");
private final SimpleDateFormat minuteDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
private final SimpleDateFormat secondDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
public long getMinuteTimeBucket(long time) {
Calendar calendar = Calendar.getInstance();
......@@ -21,6 +22,13 @@ public enum TimeBucketUtils {
return Long.valueOf(timeStr);
}
public long getSecondTimeBucket(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = secondDateFormat.format(calendar.getTime());
return Long.valueOf(timeStr);
}
public long getHourTimeBucket(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
......
......@@ -16,13 +16,13 @@ message RemoteMessage {
message RemoteData {
int32 stringCapacity = 1;
int32 longCapacity = 2;
int32 floatCapacity = 3;
int32 doubleCapacity = 3;
int32 integerCapacity = 4;
int32 byteCapacity = 5;
int32 booleanCapacity = 6;
repeated string dataStrings = 7;
repeated int64 dataLongs = 8;
repeated float dataFloats = 9;
repeated double dataDoubles = 9;
repeated int32 dataIntegers = 10;
repeated bytes dataBytes = 11;
repeated bool dataBooleans = 12;
......
package org.skywalking.apm.collector.stream.worker.util;
import java.util.Calendar;
import java.util.TimeZone;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
/**
* @author pengys5
*/
public class TimeBucketUtilsTestCase {
@Test
public void testUTCLocation() {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
long timeBucket = 201703310915L;
long changedTimeBucket = TimeBucketUtils.INSTANCE.changeToUTCTimeBucket(timeBucket);
Assert.assertEquals(201703310115L, changedTimeBucket);
}
@Test
public void testUTC8Location() {
TimeZone.setDefault(TimeZone.getTimeZone("GMT+08:00"));
long timeBucket = 201703310915L;
long changedTimeBucket = TimeBucketUtils.INSTANCE.changeToUTCTimeBucket(timeBucket);
Assert.assertEquals(201703310915L, changedTimeBucket);
}
@Test
public void testGetSecondTimeBucket() {
TimeZone.setDefault(TimeZone.getTimeZone("GMT+08:00"));
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(1490922929258L);
Assert.assertEquals(20170331091529L, timeBucket);
}
@Test
public void test() {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(1490922929258L);
calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 3);
// System.out.println(calendar.getTimeInMillis());
calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 2);
// System.out.println(calendar.getTimeInMillis());
calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 2);
// System.out.println(calendar.getTimeInMillis());
}
}
......@@ -8,7 +8,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
......
......@@ -8,7 +8,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
......
......@@ -10,7 +10,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
......
......@@ -8,7 +8,7 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
......
......@@ -4,7 +4,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -4,7 +4,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.Const;
import org.skywalking.apm.collector.stream.worker.util.Const;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册