提交 ec08a291 编写于 作者: P peng-yongsheng

Instance heart beat implement and test successful.

上级 8361f415
......@@ -60,6 +60,11 @@
<artifactId>jvm-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>metric-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>register-define</artifactId>
......
......@@ -28,6 +28,7 @@ import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ServiceNa
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingListener;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
......@@ -87,7 +88,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AnalysisSegmentParserModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AnalysisSegmentParserModule.NAME, AnalysisMetricModule.NAME};
}
private void addHandlers(Server gRPCServer) {
......
......@@ -21,13 +21,17 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.apache.skywalking.apm.network.proto.ApplicationInstance;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.apache.skywalking.apm.network.proto.Downstream;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.OSInfo;
import org.slf4j.Logger;
......@@ -41,13 +45,16 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private final IInstanceIDService instanceIDService;
private final IInstanceHeartBeatService instanceHeartBeatService;
public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
this.instanceIDService = moduleManager.find(AnalysisRegisterModule.NAME).getService(IInstanceIDService.class);
this.instanceHeartBeatService = moduleManager.find(AnalysisMetricModule.NAME).getService(IInstanceHeartBeatService.class);
}
@Override
public void registerInstance(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
public void registerInstance(ApplicationInstance request,
StreamObserver<ApplicationInstanceMapping> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = instanceIDService.getOrCreateByAgentUUID(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
......@@ -57,6 +64,12 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
responseObserver.onCompleted();
}
@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
long heartBeatTime = request.getHeartbeatTime();
this.instanceHeartBeatService.heartBeat(instanceId, heartBeatTime);
}
private String buildOsInfo(OSInfo osinfo) {
JsonObject osInfoJson = new JsonObject();
osInfoJson.addProperty("osName", osinfo.getOsName());
......
......@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.skywalking.apm.collector.analysis.jvm.define.AnalysisJVMModule;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryPoolMetricService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
......@@ -50,14 +49,12 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
private final IGCMetricService gcMetricService;
private final IMemoryMetricService memoryMetricService;
private final IMemoryPoolMetricService memoryPoolMetricService;
private final IInstanceHeartBeatService instanceHeartBeatService;
public JVMMetricsServiceHandler(ModuleManager moduleManager) {
this.cpuMetricService = moduleManager.find(AnalysisJVMModule.NAME).getService(ICpuMetricService.class);
this.gcMetricService = moduleManager.find(AnalysisJVMModule.NAME).getService(IGCMetricService.class);
this.memoryMetricService = moduleManager.find(AnalysisJVMModule.NAME).getService(IMemoryMetricService.class);
this.memoryPoolMetricService = moduleManager.find(AnalysisJVMModule.NAME).getService(IMemoryPoolMetricService.class);
this.instanceHeartBeatService = moduleManager.find(AnalysisJVMModule.NAME).getService(IInstanceHeartBeatService.class);
}
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
......@@ -66,7 +63,6 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
// sendToInstanceHeartBeatService(instanceId, metric.getTime());
sendToCpuMetricService(instanceId, time, metric.getCpu());
sendToMemoryMetricService(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricService(instanceId, time, metric.getMemoryPoolList());
......@@ -77,10 +73,6 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
responseObserver.onCompleted();
}
private void sendToInstanceHeartBeatService(int instanceId, long heartBeatTime) {
instanceHeartBeatService.send(instanceId, heartBeatTime);
}
private void sendToMemoryMetricService(int instanceId, long timeBucket, List<Memory> memories) {
memories.forEach(memory -> memoryMetricService.send(instanceId, timeBucket, memory.getIsHeap(), memory.getInit(), memory.getMax(), memory.getUsed(), memory.getCommitted()));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
/**
* @author peng-yongsheng
*/
public class InstHeartBeatServiceTestCase {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub blockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
ApplicationInstanceHeartbeat.Builder builder = ApplicationInstanceHeartbeat.newBuilder();
builder.setApplicationInstanceId(2);
builder.setHeartbeatTime(System.currentTimeMillis());
blockingStub.heartbeat(builder.build());
}
}
......@@ -20,7 +20,6 @@ package org.apache.skywalking.apm.collector.analysis.jvm.define;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryPoolMetricService;
import org.apache.skywalking.apm.collector.core.module.Module;
......@@ -38,7 +37,7 @@ public class AnalysisJVMModule extends Module {
@Override public Class[] services() {
return new Class[] {
ICpuMetricService.class, IGCMetricService.class, IInstanceHeartBeatService.class,
ICpuMetricService.class, IGCMetricService.class,
IMemoryMetricService.class, IMemoryPoolMetricService.class
};
}
......
......@@ -24,7 +24,6 @@ package org.apache.skywalking.apm.collector.analysis.jvm.define.graph;
public class GraphIdDefine {
public static final int CPU_METRIC_PERSISTENCE_GRAPH_ID = 300;
public static final int GC_METRIC_PERSISTENCE_GRAPH_ID = 301;
public static final int INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID = 302;
public static final int MEMORY_METRIC_PERSISTENCE_GRAPH_ID = 303;
public static final int MEMORY_POOL_METRIC_PERSISTENCE_GRAPH_ID = 304;
}
......@@ -65,6 +65,4 @@ public class WorkerIdDefine {
public static final int MEMORY_POOL_DAY_METRIC_TRANSFORM_NODE_ID = 3307;
public static final int MEMORY_POOL_MONTH_METRIC_PERSISTENCE_WORKER_ID = 3308;
public static final int MEMORY_POOL_MONTH_METRIC_TRANSFORM_NODE_ID = 3309;
public static final int INST_HEART_BEAT_PERSISTENCE_WORKER_ID = 302;
}
......@@ -22,17 +22,14 @@ import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.jvm.define.AnalysisJVMModule;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryPoolMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.CpuMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.GCMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.InstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.MemoryMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.MemoryPoolMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.cpu.CpuMetricPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.gc.GCMetricPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.InstanceHeartBeatPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.memory.MemoryMetricPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.memorypool.MemoryPoolMetricPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
......@@ -61,7 +58,6 @@ public class AnalysisJVMModuleProvider extends ModuleProvider {
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(ICpuMetricService.class, new CpuMetricService());
this.registerServiceImplementation(IGCMetricService.class, new GCMetricService());
this.registerServiceImplementation(IInstanceHeartBeatService.class, new InstanceHeartBeatService());
this.registerServiceImplementation(IMemoryMetricService.class, new MemoryMetricService());
this.registerServiceImplementation(IMemoryPoolMetricService.class, new MemoryPoolMetricService());
}
......@@ -90,9 +86,6 @@ public class AnalysisJVMModuleProvider extends ModuleProvider {
GCMetricPersistenceGraph gcMetricPersistenceGraph = new GCMetricPersistenceGraph(getManager(), workerCreateListener);
gcMetricPersistenceGraph.create();
InstanceHeartBeatPersistenceGraph instanceHeartBeatPersistenceGraph = new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener);
instanceHeartBeatPersistenceGraph.create();
MemoryMetricPersistenceGraph memoryMetricPersistenceGraph = new MemoryMetricPersistenceGraph(getManager(), workerCreateListener);
memoryMetricPersistenceGraph.create();
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.collector.analysis.metric.define;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.core.module.Module;
/**
......@@ -32,6 +33,6 @@ public class AnalysisMetricModule extends Module {
}
@Override public Class[] services() {
return new Class[] {};
return new Class[] {IInstanceHeartBeatService.class};
}
}
......@@ -32,8 +32,9 @@ public class MetricGraphIdDefine {
public static final int APPLICATION_COMPONENT_GRAPH_ID = 406;
public static final int APPLICATION_MAPPING_GRAPH_ID = 407;
public static final int SERVICE_ENTRY_GRAPH_ID = 408;
public static final int GLOBAL_TRACE_GRAPH_ID = 409;
public static final int SEGMENT_COST_GRAPH_ID = 410;
public static final int INSTANCE_MAPPING_GRAPH_ID = 411;
public static final int INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID = 412;
}
......@@ -112,10 +112,6 @@ public class MetricWorkerIdDefine {
public static final int APPLICATION_COMPONENT_MONTH_PERSISTENCE_WORKER_ID = 4907;
public static final int APPLICATION_COMPONENT_MONTH_TRANSFORM_NODE_ID = 4908;
public static final int SERVICE_ENTRY_AGGREGATION_WORKER_ID = 424;
public static final int SERVICE_ENTRY_REMOTE_WORKER_ID = 425;
public static final int SERVICE_ENTRY_PERSISTENCE_WORKER_ID = 426;
public static final int GLOBAL_TRACE_PERSISTENCE_WORKER_ID = 427;
public static final int SEGMENT_COST_PERSISTENCE_WORKER_ID = 428;
......@@ -125,4 +121,5 @@ public class MetricWorkerIdDefine {
public static final int INSTANCE_METRIC_GRAPH_BRIDGE_WORKER_ID = 432;
public static final int APPLICATION_METRIC_GRAPH_BRIDGE_WORKER_ID = 433;
public static final int INST_HEART_BEAT_PERSISTENCE_WORKER_ID = 400;
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.jvm.define.service;
package org.apache.skywalking.apm.collector.analysis.metric.define.service;
import org.apache.skywalking.apm.collector.core.module.Service;
......@@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.core.module.Service;
* @author peng-yongsheng
*/
public interface IInstanceHeartBeatService extends Service {
void send(int instanceId, long heartBeatTime);
void heartBeat(int instanceId, long heartBeatTime);
}
......@@ -20,6 +20,8 @@ package org.apache.skywalking.apm.collector.analysis.metric.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.metric.provider.service.InstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.ApplicationComponentGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.ApplicationComponentSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping.ApplicationMappingGraph;
......@@ -28,6 +30,7 @@ import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.appli
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.refmetric.ApplicationReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.heartbeat.InstanceHeartBeatPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.InstanceMappingGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.InstanceMappingSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric.InstanceMetricGraph;
......@@ -61,6 +64,7 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(IInstanceHeartBeatService.class, new InstanceHeartBeatService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......@@ -125,5 +129,8 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
SegmentCostGraph segmentCostGraph = new SegmentCostGraph(getManager(), workerCreateListener);
segmentCostGraph.create();
InstanceHeartBeatPersistenceGraph instanceHeartBeatPersistenceGraph = new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener);
instanceHeartBeatPersistenceGraph.create();
}
}
......@@ -16,10 +16,10 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.jvm.provider.service;
package org.apache.skywalking.apm.collector.analysis.metric.provider.service;
import org.apache.skywalking.apm.collector.analysis.jvm.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
......@@ -39,12 +39,12 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService {
private Graph<Instance> getHeartBeatGraph() {
if (ObjectUtils.isEmpty(heartBeatGraph)) {
this.heartBeatGraph = GraphManager.INSTANCE.findGraph(GraphIdDefine.INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID, Instance.class);
this.heartBeatGraph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID, Instance.class);
}
return heartBeatGraph;
}
@Override public void send(int instanceId, long heartBeatTime) {
@Override public void heartBeat(int instanceId, long heartBeatTime) {
Instance instance = new Instance();
instance.setId(String.valueOf(instanceId));
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
......
......@@ -16,9 +16,9 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.jvm.provider.worker;
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.heartbeat;
import org.apache.skywalking.apm.collector.analysis.jvm.define.graph.WorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
......@@ -37,7 +37,7 @@ public class InstHeartBeatPersistenceWorker extends PersistenceWorker<Instance>
}
@Override public int id() {
return WorkerIdDefine.INST_HEART_BEAT_PERSISTENCE_WORKER_ID;
return MetricWorkerIdDefine.INST_HEART_BEAT_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
......
......@@ -16,9 +16,9 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.jvm.provider.worker;
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.heartbeat;
import org.apache.skywalking.apm.collector.analysis.jvm.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
......@@ -38,7 +38,7 @@ public class InstanceHeartBeatPersistenceGraph {
}
public void create() {
GraphManager.INSTANCE.createIfAbsent(GraphIdDefine.INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID, Instance.class)
GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID, Instance.class)
.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
}
}
......@@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.register.Instance;
/**
* @author peng-yongsheng
*/
public interface IInstanceHeartBeatPersistenceDAO<Insert, Update, DataImpl extends Instance> extends IPersistenceDAO<Insert, Update, DataImpl> {
public interface IInstanceHeartBeatPersistenceDAO<INSERT, UPDATE, STREAM_DATA extends Instance> extends IPersistenceDAO<INSERT, UPDATE, STREAM_DATA> {
}
......@@ -46,11 +46,12 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
@Override public Instance get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceTable.TABLE, id).get();
if (getResponse.isExists()) {
Map<String, Object> source = getResponse.getSource();
Instance instance = new Instance();
instance.setId(id);
Map<String, Object> source = getResponse.getSource();
instance.setInstanceId((Integer)source.get(InstanceTable.COLUMN_INSTANCE_ID));
instance.setHeartBeatTime((Long)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME));
instance.setInstanceId(((Number)source.get(InstanceTable.COLUMN_INSTANCE_ID)).intValue());
instance.setHeartBeatTime(((Number)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME)).longValue());
logger.debug("getId: {} is exists", id);
return instance;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册