From b671d634205728cd5633b4ec43e46ce00ff0234e Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Wed, 16 Aug 2017 23:05:01 +0800 Subject: [PATCH] Update instance heart beat time that receive from jam metric service. #358 --- .../handler/JVMMetricsServiceHandler.java | 19 +++- .../InstHeartBeatPersistenceWorker.java | 71 +++++++++++++++ .../heartbeat/dao/IInstanceHeartBeatDAO.java | 7 ++ .../heartbeat/dao/InstanceHeartBeatEsDAO.java | 31 +++++++ .../heartbeat/dao/InstanceHeartBeatH2DAO.java | 9 ++ .../define/InstanceHeartBeatDataDefine.java | 91 +++++++++++++++++++ .../defines/local_worker_provider.define | 3 +- .../storage/table/register/InstanceTable.java | 2 +- 8 files changed, 230 insertions(+), 3 deletions(-) create mode 100644 apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/InstHeartBeatPersistenceWorker.java create mode 100644 apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/IInstanceHeartBeatDAO.java create mode 100644 apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatEsDAO.java create mode 100644 apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java create mode 100644 apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/define/InstanceHeartBeatDataDefine.java diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/handler/JVMMetricsServiceHandler.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/handler/JVMMetricsServiceHandler.java index 31edb587a1..15bc9d7c80 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/handler/JVMMetricsServiceHandler.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/grpc/handler/JVMMetricsServiceHandler.java @@ -6,17 +6,19 @@ import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWork import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine; import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker; import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricDataDefine; +import org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker; +import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine; import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker; import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricDataDefine; import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker; import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricDataDefine; import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; -import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.network.proto.CPU; import org.skywalking.apm.network.proto.Downstream; @@ -42,6 +44,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); request.getMetricsList().forEach(metric -> { long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); + senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, time); sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu()); sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList()); sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList()); @@ -52,6 +55,20 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe responseObserver.onCompleted(); } + private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context, int applicationInstanceId, + long heartBeatTime) { + InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat(); + heartBeat.setId(String.valueOf(applicationInstanceId)); + heartBeat.setHeartbeatTime(heartBeatTime); + heartBeat.setApplicationInstanceId(applicationInstanceId); + try { + logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId()); + context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData()); + } catch (WorkerInvokeException | WorkerNotFoundException e) { + logger.error(e.getMessage(), e); + } + } + private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int applicationInstanceId, long timeBucket, CPU cpu) { CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric(); diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/InstHeartBeatPersistenceWorker.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/InstHeartBeatPersistenceWorker.java new file mode 100644 index 0000000000..289b133264 --- /dev/null +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/InstHeartBeatPersistenceWorker.java @@ -0,0 +1,71 @@ +package org.skywalking.apm.collector.agentjvm.worker.heartbeat; + +import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO; +import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine; +import org.skywalking.apm.collector.storage.dao.DAOContainer; +import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; +import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.Role; +import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; +import org.skywalking.apm.collector.stream.worker.selector.RollingSelector; +import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector; + +/** + * @author pengys5 + */ +public class InstHeartBeatPersistenceWorker extends PersistenceWorker { + + public InstHeartBeatPersistenceWorker(Role role, ClusterWorkerContext clusterContext) { + super(role, clusterContext); + } + + @Override public void preStart() throws ProviderNotFoundException { + super.preStart(); + } + + @Override protected boolean needMergeDBData() { + return false; + } + + @Override protected IPersistenceDAO persistenceDAO() { + return (IPersistenceDAO)DAOContainer.INSTANCE.get(InstanceHeartBeatEsDAO.class.getName()); + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + @Override + public Role role() { + return WorkerRole.INSTANCE; + } + + @Override + public InstHeartBeatPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) { + return new InstHeartBeatPersistenceWorker(role(), clusterContext); + } + + @Override + public int queueSize() { + return 1024; + } + } + + public enum WorkerRole implements Role { + INSTANCE; + + @Override + public String roleName() { + return InstHeartBeatPersistenceWorker.class.getSimpleName(); + } + + @Override + public WorkerSelector workerSelector() { + return new RollingSelector(); + } + + @Override public DataDefine dataDefine() { + return new InstanceHeartBeatDataDefine(); + } + } +} diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/IInstanceHeartBeatDAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/IInstanceHeartBeatDAO.java new file mode 100644 index 0000000000..c03e396587 --- /dev/null +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/IInstanceHeartBeatDAO.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao; + +/** + * @author pengys5 + */ +public interface IInstanceHeartBeatDAO { +} \ No newline at end of file diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatEsDAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatEsDAO.java new file mode 100644 index 0000000000..3ced6633a3 --- /dev/null +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatEsDAO.java @@ -0,0 +1,31 @@ +package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao; + +import java.util.HashMap; +import java.util.Map; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; +import org.skywalking.apm.collector.storage.table.register.InstanceTable; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.skywalking.apm.collector.stream.worker.impl.data.Data; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; + +/** + * @author pengys5 + */ +public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO { + + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + + @Override public IndexRequestBuilder prepareBatchInsert(Data data) { + return null; + } + + @Override public UpdateRequestBuilder prepareBatchUpdate(Data data) { + Map source = new HashMap<>(); + source.put(InstanceTable.COLUMN_REGISTER_TIME, data.getDataLong(0)); + return getClient().prepareUpdate(InstanceTable.TABLE, data.getDataString(0)).setDoc(source); + } +} diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java new file mode 100644 index 0000000000..2a63de395f --- /dev/null +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java @@ -0,0 +1,9 @@ +package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao; + +import org.skywalking.apm.collector.storage.h2.dao.H2DAO; + +/** + * @author pengys5 + */ +public class InstanceHeartBeatH2DAO extends H2DAO implements IInstanceHeartBeatDAO { +} diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/define/InstanceHeartBeatDataDefine.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/define/InstanceHeartBeatDataDefine.java new file mode 100644 index 0000000000..b91275fa4d --- /dev/null +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/define/InstanceHeartBeatDataDefine.java @@ -0,0 +1,91 @@ +package org.skywalking.apm.collector.agentjvm.worker.heartbeat.define; + +import org.skywalking.apm.collector.core.framework.UnexpectedException; +import org.skywalking.apm.collector.remote.grpc.proto.RemoteData; +import org.skywalking.apm.collector.storage.table.register.InstanceTable; +import org.skywalking.apm.collector.stream.worker.impl.data.Attribute; +import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType; +import org.skywalking.apm.collector.stream.worker.impl.data.Data; +import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine; +import org.skywalking.apm.collector.stream.worker.impl.data.Transform; +import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation; +import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation; + +/** + * @author pengys5 + */ +public class InstanceHeartBeatDataDefine extends DataDefine { + + @Override protected int initialCapacity() { + return 3; + } + + @Override protected void attributeDefine() { + addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation())); + addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new NonOperation())); + addAttribute(2, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation())); + } + + @Override public Object deserialize(RemoteData remoteData) { + throw new UnexpectedException("instance heart beat data did not need send to remote worker."); + } + + @Override public RemoteData serialize(Object object) { + throw new UnexpectedException("instance heart beat data did not need send to remote worker."); + } + + public static class InstanceHeartBeat implements Transform { + private String id; + private int applicationInstanceId; + private long heartbeatTime; + + public InstanceHeartBeat(String id, int applicationInstanceId, long heartbeatTime) { + this.id = id; + this.applicationInstanceId = applicationInstanceId; + this.heartbeatTime = heartbeatTime; + } + + public InstanceHeartBeat() { + } + + @Override public Data toData() { + InstanceHeartBeatDataDefine define = new InstanceHeartBeatDataDefine(); + Data data = define.build(id); + data.setDataString(0, this.id); + data.setDataInteger(0, this.applicationInstanceId); + data.setDataLong(0, this.heartbeatTime); + return data; + } + + @Override public InstanceHeartBeat toSelf(Data data) { + this.id = data.getDataString(0); + this.applicationInstanceId = data.getDataInteger(0); + this.heartbeatTime = data.getDataLong(0); + return this; + } + + public void setId(String id) { + this.id = id; + } + + public void setApplicationInstanceId(int applicationInstanceId) { + this.applicationInstanceId = applicationInstanceId; + } + + public String getId() { + return id; + } + + public int getApplicationInstanceId() { + return applicationInstanceId; + } + + public long getHeartbeatTime() { + return heartbeatTime; + } + + public void setHeartbeatTime(long heartbeatTime) { + this.heartbeatTime = heartbeatTime; + } + } +} diff --git a/apm-collector/apm-collector-agentjvm/src/main/resources/META-INF/defines/local_worker_provider.define b/apm-collector/apm-collector-agentjvm/src/main/resources/META-INF/defines/local_worker_provider.define index aa00e14b79..6c21da7dbd 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/resources/META-INF/defines/local_worker_provider.define +++ b/apm-collector/apm-collector-agentjvm/src/main/resources/META-INF/defines/local_worker_provider.define @@ -1,4 +1,5 @@ org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker$Factory org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker$Factory org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker$Factory -org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker$Factory \ No newline at end of file +org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker$Factory +org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker$Factory \ No newline at end of file diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/table/register/InstanceTable.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/table/register/InstanceTable.java index c1cf6e8358..bdcf0ff0ba 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/table/register/InstanceTable.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/table/register/InstanceTable.java @@ -11,5 +11,5 @@ public class InstanceTable extends CommonTable { public static final String COLUMN_AGENT_UUID = "agent_uuid"; public static final String COLUMN_REGISTER_TIME = "register_time"; public static final String COLUMN_INSTANCE_ID = "instance_id"; - public static final String COLUMN_HEARTBEAT_TIME = "heartbeatTime"; + public static final String COLUMN_HEARTBEAT_TIME = "heartbeat_time"; } -- GitLab