提交 f1f51903 编写于 作者: P peng-yongsheng

Redefine graph id and node id.

上级 4b78b860
...@@ -42,7 +42,7 @@ public class SegmentPost { ...@@ -42,7 +42,7 @@ public class SegmentPost {
JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json"); JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json");
JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json"); JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json");
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1; i++) {
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString()); HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString()); HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
} }
......
...@@ -15,25 +15,6 @@ ...@@ -15,25 +15,6 @@
], ],
"ai": 2, "ai": 2,
"ii": 2, "ii": 2,
"rs": [
{
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"si": 1,
"vi": 0,
"vn": "/dubbox-case/case/dubbox-rest",
"ni": 0,
"nn": "172.25.0.4:20880",
"ea": 2,
"ei": 0,
"en": "/dubbox-case/case/dubbox-rest",
"rn": 0
}
],
"ss": [ "ss": [
{ {
"si": 0, "si": 0,
...@@ -49,6 +30,25 @@ ...@@ -49,6 +30,25 @@
"pi": 0, "pi": 0,
"pn": "", "pn": "",
"ie": false, "ie": false,
"rs": [
{
"ts": [
230150,
185809,
24040000
],
"ai": -1,
"si": 1,
"vi": 0,
"vn": "/dubbox-case/case/dubbox-rest",
"ni": 0,
"nn": "172.25.0.4:20880",
"ea": 2,
"ei": 0,
"en": "/dubbox-case/case/dubbox-rest",
"rn": 0
}
],
"to": [ "to": [
{ {
"k": "url", "k": "url",
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.graph;
/**
* @author peng-yongsheng
*/
public class ApplicationGraphNodeIdDefine {
public static final int APPLICATION_REFERENCE_METRIC_AGGREGATION_NODE_ID = 1001;
public static final int APPLICATION_REFERENCE_METRIC_REMOTE_NODE_ID = 1002;
public static final int APPLICATION_REFERENCE_METRIC_PERSISTENCE_NODE_ID = 1003;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.graph;
/**
* @author peng-yongsheng
*/
public class InstanceGraphNodeIdDefine {
public static final int INSTANCE_REFERENCE_METRIC_AGGREGATION_NODE_ID = 2001;
public static final int INSTANCE_REFERENCE_METRIC_REMOTE_NODE_ID = 2002;
public static final int INSTANCE_REFERENCE_METRIC_PERSISTENCE_NODE_ID = 2003;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.graph;
/**
* @author peng-yongsheng
*/
public class JvmMetricStreamGraphDefine {
public static final int GC_METRIC_GRAPH_ID = 100;
public static final int MEMORY_METRIC_GRAPH_ID = 101;
public static final int MEMORY_POOL_METRIC_GRAPH_ID = 102;
public static final int CPU_METRIC_GRAPH_ID = 103;
public static final int INST_HEART_BEAT_GRAPH_ID = 104;
}
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
* Project repository: https://github.com/OpenSkywalking/skywalking * Project repository: https://github.com/OpenSkywalking/skywalking
*/ */
package org.skywalking.apm.collector.agent.stream.parser; package org.skywalking.apm.collector.agent.stream.service.graph;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface RefsListener extends SpanListener { public class RegisterStreamGraphDefine {
void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, String segmentId); public static final int APPLICATION_REGISTER_GRAPH_ID = 200;
public static final int INSTANCE_REGISTER_GRAPH_ID = 201;
public static final int SERVICE_NAME_REGISTER_GRAPH_ID = 202;
} }
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.graph;
/**
* @author peng-yongsheng
*/
public class ServiceGraphNodeIdDefine {
public static final int SERVICE_REFERENCE_METRIC_AGGREGATION_NODE_ID = 3001;
public static final int SERVICE_REFERENCE_METRIC_REMOTE_NODE_ID = 3002;
public static final int SERVICE_REFERENCE_METRIC_PERSISTENCE_NODE_ID = 3003;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.trace;
/**
* @author peng-yongsheng
*/
public enum MetricSource {
Exit, Entry
}
...@@ -45,7 +45,6 @@ public class AgentStreamBootStartup { ...@@ -45,7 +45,6 @@ public class AgentStreamBootStartup {
PersistenceTimer timer = new PersistenceTimer(); PersistenceTimer timer = new PersistenceTimer();
timer.start(moduleManager, workerCreateListener.getPersistenceWorkers()); timer.start(moduleManager, workerCreateListener.getPersistenceWorkers());
} }
private void createJVMGraph() { private void createJVMGraph() {
...@@ -71,10 +70,11 @@ public class AgentStreamBootStartup { ...@@ -71,10 +70,11 @@ public class AgentStreamBootStartup {
traceStreamGraph.createInstanceMetricGraph(); traceStreamGraph.createInstanceMetricGraph();
traceStreamGraph.createApplicationComponentGraph(); traceStreamGraph.createApplicationComponentGraph();
traceStreamGraph.createApplicationMappingGraph(); traceStreamGraph.createApplicationMappingGraph();
traceStreamGraph.createApplicationReferenceMetricGraph(); // traceStreamGraph.createApplicationReferenceMetricGraph();
traceStreamGraph.createServiceEntryGraph(); traceStreamGraph.createServiceEntryGraph();
traceStreamGraph.createServiceReferenceGraph();
traceStreamGraph.createSegmentGraph(); traceStreamGraph.createSegmentGraph();
traceStreamGraph.createSegmentCostGraph(); traceStreamGraph.createSegmentCostGraph();
traceStreamGraph.createServiceReferenceGraph();
} }
} }
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.graph;
/**
* @author peng-yongsheng
*/
public interface GraphDefine {
}
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.graph; package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker;
...@@ -40,12 +41,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; ...@@ -40,12 +41,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
*/ */
public class JvmMetricStreamGraph { public class JvmMetricStreamGraph {
public static final int GC_METRIC_GRAPH_ID = 100;
public static final int MEMORY_METRIC_GRAPH_ID = 101;
public static final int MEMORY_POOL_METRIC_GRAPH_ID = 102;
public static final int CPU_METRIC_GRAPH_ID = 103;
public static final int INST_HEART_BEAT_GRAPH_ID = 104;
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener; private final WorkerCreateListener workerCreateListener;
...@@ -58,7 +53,7 @@ public class JvmMetricStreamGraph { ...@@ -58,7 +53,7 @@ public class JvmMetricStreamGraph {
public void createGcMetricGraph() { public void createGcMetricGraph() {
QueueCreatorService<GCMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<GCMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class); Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.GC_METRIC_GRAPH_ID, GCMetric.class);
graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
...@@ -66,7 +61,7 @@ public class JvmMetricStreamGraph { ...@@ -66,7 +61,7 @@ public class JvmMetricStreamGraph {
public void createCpuMetricGraph() { public void createCpuMetricGraph() {
QueueCreatorService<CpuMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<CpuMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class); Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.CPU_METRIC_GRAPH_ID, CpuMetric.class);
graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
...@@ -74,7 +69,7 @@ public class JvmMetricStreamGraph { ...@@ -74,7 +69,7 @@ public class JvmMetricStreamGraph {
public void createMemoryMetricGraph() { public void createMemoryMetricGraph() {
QueueCreatorService<MemoryMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<MemoryMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
...@@ -82,7 +77,7 @@ public class JvmMetricStreamGraph { ...@@ -82,7 +77,7 @@ public class JvmMetricStreamGraph {
public void createMemoryPoolMetricGraph() { public void createMemoryPoolMetricGraph() {
QueueCreatorService<MemoryPoolMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<MemoryPoolMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
...@@ -90,7 +85,7 @@ public class JvmMetricStreamGraph { ...@@ -90,7 +85,7 @@ public class JvmMetricStreamGraph {
public void createHeartBeatGraph() { public void createHeartBeatGraph() {
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class); Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.INST_HEART_BEAT_GRAPH_ID, Instance.class);
graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.graph; package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterRemoteWorker; import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker; import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceRegisterRemoteWorker; import org.skywalking.apm.collector.agent.stream.worker.register.InstanceRegisterRemoteWorker;
...@@ -41,10 +42,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; ...@@ -41,10 +42,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
*/ */
public class RegisterStreamGraph { public class RegisterStreamGraph {
public static final int APPLICATION_REGISTER_GRAPH_ID = 200;
public static final int INSTANCE_REGISTER_GRAPH_ID = 201;
public static final int SERVICE_NAME_REGISTER_GRAPH_ID = 202;
private final ModuleManager moduleManager; private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener; private final WorkerCreateListener workerCreateListener;
...@@ -59,8 +56,8 @@ public class RegisterStreamGraph { ...@@ -59,8 +56,8 @@ public class RegisterStreamGraph {
QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<Application> graph = GraphManager.INSTANCE.createIfAbsent(APPLICATION_REGISTER_GRAPH_ID, Application.class); Graph<Application> graph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.APPLICATION_REGISTER_GRAPH_ID, Application.class);
graph.addNode(new ApplicationRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REGISTER_GRAPH_ID).create(workerCreateListener)) graph.addNode(new ApplicationRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, RegisterStreamGraphDefine.APPLICATION_REGISTER_GRAPH_ID).create(workerCreateListener))
.addNext(new ApplicationRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); .addNext(new ApplicationRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
...@@ -70,8 +67,8 @@ public class RegisterStreamGraph { ...@@ -70,8 +67,8 @@ public class RegisterStreamGraph {
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INSTANCE_REGISTER_GRAPH_ID, Instance.class); Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
graph.addNode(new InstanceRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, INSTANCE_REGISTER_GRAPH_ID).create(workerCreateListener)) graph.addNode(new InstanceRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, RegisterStreamGraphDefine.INSTANCE_REGISTER_GRAPH_ID).create(workerCreateListener))
.addNext(new InstanceRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); .addNext(new InstanceRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
...@@ -81,8 +78,8 @@ public class RegisterStreamGraph { ...@@ -81,8 +78,8 @@ public class RegisterStreamGraph {
QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<ServiceName> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); Graph<ServiceName> graph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class);
graph.addNode(new ServiceNameRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_NAME_REGISTER_GRAPH_ID).create(workerCreateListener)) graph.addNode(new ServiceNameRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, RegisterStreamGraphDefine.SERVICE_NAME_REGISTER_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceNameRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); .addNext(new ServiceNameRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
} }
...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.graph; ...@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardization; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardization;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker;
import org.skywalking.apm.collector.agent.stream.service.graph.ServiceGraphNodeIdDefine;
import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentAggregationWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentRemoteWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationComponentRemoteWorker;
...@@ -31,6 +32,9 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.application.Applic ...@@ -31,6 +32,9 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.application.Applic
import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationReferenceMetricRemoteWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.application.ApplicationReferenceMetricRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceMetricPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceReferenceMetricAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceReferenceMetricRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceReferencePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker;
...@@ -41,6 +45,7 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceRef ...@@ -41,6 +45,7 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceRef
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceReferenceMetricRemoteWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceReferenceMetricRemoteWorker;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.graph.Node;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.QueueModule; import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.queue.service.QueueCreatorService;
...@@ -51,6 +56,7 @@ import org.skywalking.apm.collector.storage.table.application.ApplicationMapping ...@@ -51,6 +56,7 @@ import org.skywalking.apm.collector.storage.table.application.ApplicationMapping
import org.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric; import org.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace; import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.skywalking.apm.collector.storage.table.instance.InstanceMetric; import org.skywalking.apm.collector.storage.table.instance.InstanceMetric;
import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
import org.skywalking.apm.collector.storage.table.segment.Segment; import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost; import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry; import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
...@@ -61,7 +67,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; ...@@ -61,7 +67,6 @@ import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class TraceStreamGraph { public class TraceStreamGraph {
public static final int GLOBAL_TRACE_GRAPH_ID = 300; public static final int GLOBAL_TRACE_GRAPH_ID = 300;
public static final int INSTANCE_METRIC_GRAPH_ID = 301; public static final int INSTANCE_METRIC_GRAPH_ID = 301;
public static final int APPLICATION_COMPONENT_GRAPH_ID = 302; public static final int APPLICATION_COMPONENT_GRAPH_ID = 302;
...@@ -127,17 +132,6 @@ public class TraceStreamGraph { ...@@ -127,17 +132,6 @@ public class TraceStreamGraph {
.addNext(new ApplicationMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); .addNext(new ApplicationMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
@SuppressWarnings("unchecked")
public void createApplicationReferenceMetricGraph() {
QueueCreatorService<ApplicationReferenceMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Graph<ApplicationReferenceMetric> graph = GraphManager.INSTANCE.createIfAbsent(APPLICATION_REFERENCE_METRIC_GRAPH_ID, ApplicationReferenceMetric.class);
graph.addNode(new ApplicationReferenceMetricAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ApplicationReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener))
.addNext(new ApplicationReferenceMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void createServiceEntryGraph() { public void createServiceEntryGraph() {
QueueCreatorService<ServiceEntry> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); QueueCreatorService<ServiceEntry> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
...@@ -158,6 +152,33 @@ public class TraceStreamGraph { ...@@ -158,6 +152,33 @@ public class TraceStreamGraph {
graph.addNode(new ServiceReferenceMetricAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) graph.addNode(new ServiceReferenceMetricAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ServiceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener)) .addNext(new ServiceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceReferenceMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); .addNext(new ServiceReferenceMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
createInstanceReferenceGraph(graph);
}
@SuppressWarnings("unchecked")
private void createInstanceReferenceGraph(Graph<ServiceReferenceMetric> graph) {
QueueCreatorService<ServiceReferenceMetric> aggregationQueueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
QueueCreatorService<InstanceReferenceMetric> persistenceQueueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Node<?, ServiceReferenceMetric> serviceReferenceMetricNode = graph.toFinder().findNode(ServiceGraphNodeIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_NODE_ID, ServiceReferenceMetric.class);
serviceReferenceMetricNode.addNext(new InstanceReferenceMetricAggregationWorker.Factory(moduleManager, aggregationQueueCreatorService).create(workerCreateListener))
.addNext(new InstanceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new InstanceReferencePersistenceWorker.Factory(moduleManager, persistenceQueueCreatorService).create(workerCreateListener));
createApplicationReferenceMetricGraph(graph);
}
@SuppressWarnings("unchecked")
private void createApplicationReferenceMetricGraph(Graph<ServiceReferenceMetric> graph) {
QueueCreatorService<ApplicationReferenceMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
// Node<?, ServiceReferenceMetric> serviceReferenceMetricNode = graph.toFinder().findNode(ServiceGraphNodeIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_NODE_ID, ServiceReferenceMetric.class);
// graph.addNode(new ApplicationReferenceMetricAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
// .addNext(new ApplicationReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener))
// .addNext(new ApplicationReferenceMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
......
...@@ -36,6 +36,7 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceM ...@@ -36,6 +36,7 @@ import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstanceM
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceReferenceMetricSpanListener; import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceReferenceMetricSpanListener;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
...@@ -119,6 +120,7 @@ public class SegmentParse { ...@@ -119,6 +120,7 @@ public class SegmentParse {
int applicationId = segmentDecorator.getApplicationId(); int applicationId = segmentDecorator.getApplicationId();
int applicationInstanceId = segmentDecorator.getApplicationInstanceId(); int applicationInstanceId = segmentDecorator.getApplicationInstanceId();
int entrySpanCount = 0;
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) { for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i); SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
...@@ -130,11 +132,21 @@ public class SegmentParse { ...@@ -130,11 +132,21 @@ public class SegmentParse {
if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, applicationId)) { if (!ReferenceIdExchanger.getInstance(moduleManager).exchange(referenceDecorator, applicationId)) {
return false; return false;
} }
notifyRefsListener(referenceDecorator, applicationId, applicationInstanceId, segmentId);
} }
} }
if (SpanType.Entry.equals(spanDecorator.getSpanType())) {
entrySpanCount++;
}
if (entrySpanCount > 1) {
throw new UnexpectedException("This segment contains multiple entry span.");
}
}
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
if (spanDecorator.getSpanId() == 0) { if (spanDecorator.getSpanId() == 0) {
notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId); notifyFirstListener(spanDecorator, applicationId, applicationInstanceId, segmentId);
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
...@@ -210,15 +222,6 @@ public class SegmentParse { ...@@ -210,15 +222,6 @@ public class SegmentParse {
} }
} }
private void notifyRefsListener(ReferenceDecorator reference, int applicationId, int applicationInstanceId,
String segmentId) {
for (SpanListener listener : spanListeners) {
if (listener instanceof RefsListener) {
((RefsListener)listener).parseRef(reference, applicationId, applicationInstanceId, segmentId);
}
}
}
private void notifyGlobalsListener(UniqueId uniqueId) { private void notifyGlobalsListener(UniqueId uniqueId) {
for (SpanListener listener : spanListeners) { for (SpanListener listener : spanListeners) {
if (listener instanceof GlobalTraceIdsListener) { if (listener instanceof GlobalTraceIdsListener) {
......
...@@ -191,7 +191,11 @@ public class SpanDecorator implements StandardBuilder { ...@@ -191,7 +191,11 @@ public class SpanDecorator implements StandardBuilder {
} }
public int getRefsCount() { public int getRefsCount() {
return spanObject.getRefsCount(); if (isOrigin) {
return spanObject.getRefsCount();
} else {
return spanBuilder.getRefsCount();
}
} }
public ReferenceDecorator getRefs(int index) { public ReferenceDecorator getRefs(int index) {
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm; package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService; import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
...@@ -39,7 +39,7 @@ public class CpuMetricService implements ICpuMetricService { ...@@ -39,7 +39,7 @@ public class CpuMetricService implements ICpuMetricService {
private Graph<CpuMetric> getCpuMetricGraph() { private Graph<CpuMetric> getCpuMetricGraph() {
if (ObjectUtils.isEmpty(cpuMetricGraph)) { if (ObjectUtils.isEmpty(cpuMetricGraph)) {
cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID, CpuMetric.class); cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.CPU_METRIC_GRAPH_ID, CpuMetric.class);
} }
return cpuMetricGraph; return cpuMetricGraph;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm; package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService; import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
...@@ -39,7 +39,7 @@ public class GCMetricService implements IGCMetricService { ...@@ -39,7 +39,7 @@ public class GCMetricService implements IGCMetricService {
private Graph<GCMetric> getGcMetricGraph() { private Graph<GCMetric> getGcMetricGraph() {
if (ObjectUtils.isEmpty(gcMetricGraph)) { if (ObjectUtils.isEmpty(gcMetricGraph)) {
gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID, GCMetric.class); gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.GC_METRIC_GRAPH_ID, GCMetric.class);
} }
return gcMetricGraph; return gcMetricGraph;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm; package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService; import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
...@@ -39,7 +39,7 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService { ...@@ -39,7 +39,7 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService {
private Graph<Instance> getHeartBeatGraph() { private Graph<Instance> getHeartBeatGraph() {
if (ObjectUtils.isEmpty(heartBeatGraph)) { if (ObjectUtils.isEmpty(heartBeatGraph)) {
this.heartBeatGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.INST_HEART_BEAT_GRAPH_ID, Instance.class); this.heartBeatGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.INST_HEART_BEAT_GRAPH_ID, Instance.class);
} }
return heartBeatGraph; return heartBeatGraph;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm; package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService; import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
...@@ -39,7 +39,7 @@ public class MemoryMetricService implements IMemoryMetricService { ...@@ -39,7 +39,7 @@ public class MemoryMetricService implements IMemoryMetricService {
private Graph<MemoryMetric> getMemoryMetricGraph() { private Graph<MemoryMetric> getMemoryMetricGraph() {
if (ObjectUtils.isEmpty(memoryMetricGraph)) { if (ObjectUtils.isEmpty(memoryMetricGraph)) {
this.memoryMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); this.memoryMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
} }
return memoryMetricGraph; return memoryMetricGraph;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.jvm; package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.JvmMetricStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService; import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
...@@ -39,7 +39,7 @@ public class MemoryPoolMetricService implements IMemoryPoolMetricService { ...@@ -39,7 +39,7 @@ public class MemoryPoolMetricService implements IMemoryPoolMetricService {
private Graph<MemoryPoolMetric> getMemoryPoolMetricGraph() { private Graph<MemoryPoolMetric> getMemoryPoolMetricGraph() {
if (ObjectUtils.isEmpty(memoryPoolMetricGraph)) { if (ObjectUtils.isEmpty(memoryPoolMetricGraph)) {
this.memoryPoolMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); this.memoryPoolMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraphDefine.MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
} }
return memoryPoolMetricGraph; return memoryPoolMetricGraph;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.register; package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService; import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService; import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
...@@ -47,7 +47,7 @@ public class ApplicationIDService implements IApplicationIDService { ...@@ -47,7 +47,7 @@ public class ApplicationIDService implements IApplicationIDService {
private Graph<Application> getApplicationRegisterGraph() { private Graph<Application> getApplicationRegisterGraph() {
if (ObjectUtils.isEmpty(applicationRegisterGraph)) { if (ObjectUtils.isEmpty(applicationRegisterGraph)) {
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class); this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.APPLICATION_REGISTER_GRAPH_ID, Application.class);
} }
return this.applicationRegisterGraph; return this.applicationRegisterGraph;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.register; package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService; import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.InstanceCacheService; import org.skywalking.apm.collector.cache.service.InstanceCacheService;
...@@ -57,7 +57,7 @@ public class InstanceIDService implements IInstanceIDService { ...@@ -57,7 +57,7 @@ public class InstanceIDService implements IInstanceIDService {
private Graph<Instance> getInstanceRegisterGraph() { private Graph<Instance> getInstanceRegisterGraph() {
if (ObjectUtils.isEmpty(instanceRegisterGraph)) { if (ObjectUtils.isEmpty(instanceRegisterGraph)) {
this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class); this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
} }
return instanceRegisterGraph; return instanceRegisterGraph;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.register; package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph; import org.skywalking.apm.collector.agent.stream.service.graph.RegisterStreamGraphDefine;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService; import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService; import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
...@@ -54,7 +54,7 @@ public class ServiceNameService implements IServiceNameService { ...@@ -54,7 +54,7 @@ public class ServiceNameService implements IServiceNameService {
private Graph<ServiceName> getServiceNameRegisterGraph() { private Graph<ServiceName> getServiceNameRegisterGraph() {
if (ObjectUtils.isEmpty(serviceNameRegisterGraph)) { if (ObjectUtils.isEmpty(serviceNameRegisterGraph)) {
this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraphDefine.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class);
} }
return serviceNameRegisterGraph; return serviceNameRegisterGraph;
} }
......
...@@ -18,12 +18,11 @@ ...@@ -18,12 +18,11 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.application; package org.skywalking.apm.collector.agent.stream.worker.trace.application;
import java.util.ArrayList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
...@@ -36,22 +35,23 @@ import org.slf4j.LoggerFactory; ...@@ -36,22 +35,23 @@ import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ApplicationMappingSpanListener implements RefsListener, FirstSpanListener { public class ApplicationMappingSpanListener implements FirstSpanListener, EntrySpanListener {
private final Logger logger = LoggerFactory.getLogger(ApplicationMappingSpanListener.class); private final Logger logger = LoggerFactory.getLogger(ApplicationMappingSpanListener.class);
private List<ApplicationMapping> applicationMappings = new ArrayList<>(); private List<ApplicationMapping> applicationMappings = new LinkedList<>();
private long timeBucket; private long timeBucket;
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, @Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
String segmentId) {
logger.debug("node mapping listener parse reference"); logger.debug("node mapping listener parse reference");
ApplicationMapping applicationMapping = new ApplicationMapping(Const.EMPTY_STRING); if (spanDecorator.getRefsCount() > 0) {
applicationMapping.setApplicationId(applicationId); ApplicationMapping applicationMapping = new ApplicationMapping(Const.EMPTY_STRING);
applicationMapping.setAddressId(referenceDecorator.getNetworkAddressId()); applicationMapping.setApplicationId(applicationId);
String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(applicationMapping.getAddressId()); applicationMapping.setAddressId(spanDecorator.getRefs(0).getNetworkAddressId());
applicationMapping.setId(id); String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(applicationMapping.getAddressId());
applicationMappings.add(applicationMapping); applicationMapping.setId(id);
applicationMappings.add(applicationMapping);
}
} }
@Override @Override
...@@ -63,12 +63,11 @@ public class ApplicationMappingSpanListener implements RefsListener, FirstSpanLi ...@@ -63,12 +63,11 @@ public class ApplicationMappingSpanListener implements RefsListener, FirstSpanLi
@Override public void build() { @Override public void build() {
logger.debug("node mapping listener build"); logger.debug("node mapping listener build");
Graph<ApplicationMapping> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.APPLICATION_MAPPING_GRAPH_ID, ApplicationMapping.class); Graph<ApplicationMapping> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.APPLICATION_MAPPING_GRAPH_ID, ApplicationMapping.class);
applicationMappings.forEach(applicationMapping -> {
for (ApplicationMapping applicationMapping : applicationMappings) {
applicationMapping.setId(timeBucket + Const.ID_SPLIT + applicationMapping.getId()); applicationMapping.setId(timeBucket + Const.ID_SPLIT + applicationMapping.getId());
applicationMapping.setTimeBucket(timeBucket); applicationMapping.setTimeBucket(timeBucket);
logger.debug("push to node mapping aggregation worker, id: {}", applicationMapping.getId()); logger.debug("push to node mapping aggregation worker, id: {}", applicationMapping.getId());
graph.start(applicationMapping); graph.start(applicationMapping);
} });
} }
} }
...@@ -18,28 +18,65 @@ ...@@ -18,28 +18,65 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.application; package org.skywalking.apm.collector.agent.stream.worker.trace.application;
import org.skywalking.apm.collector.agent.stream.service.graph.ApplicationGraphNodeIdDefine;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric; import org.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric;
import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker; import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ApplicationReferenceMetricAggregationWorker extends AggregationWorker<ApplicationReferenceMetric, ApplicationReferenceMetric> { public class ApplicationReferenceMetricAggregationWorker extends AggregationWorker<InstanceReferenceMetric, ApplicationReferenceMetric> {
private final InstanceCacheService instanceCacheService;
public ApplicationReferenceMetricAggregationWorker(ModuleManager moduleManager) { public ApplicationReferenceMetricAggregationWorker(ModuleManager moduleManager) {
super(moduleManager); super(moduleManager);
this.instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
} }
@Override public int id() { @Override public int id() {
return ApplicationReferenceMetricAggregationWorker.class.hashCode(); return ApplicationGraphNodeIdDefine.APPLICATION_REFERENCE_METRIC_AGGREGATION_NODE_ID;
}
@Override protected ApplicationReferenceMetric transform(InstanceReferenceMetric instanceReferenceMetric) {
Integer frontApplicationId = instanceCacheService.get(instanceReferenceMetric.getFrontInstanceId());
Integer behindApplicationId = instanceCacheService.get(instanceReferenceMetric.getBehindInstanceId());
String id = instanceReferenceMetric.getTimeBucket() + Const.ID_SPLIT + frontApplicationId + Const.ID_SPLIT + behindApplicationId;
ApplicationReferenceMetric applicationReferenceMetric = new ApplicationReferenceMetric(id);
applicationReferenceMetric.setFrontApplicationId(frontApplicationId);
applicationReferenceMetric.setBehindApplicationId(behindApplicationId);
applicationReferenceMetric.setTransactionCalls(instanceReferenceMetric.getTransactionCalls());
applicationReferenceMetric.setTransactionErrorCalls(instanceReferenceMetric.getTransactionErrorCalls());
applicationReferenceMetric.setTransactionDurationSum(instanceReferenceMetric.getTransactionDurationSum());
applicationReferenceMetric.setTransactionErrorDurationSum(instanceReferenceMetric.getTransactionErrorDurationSum());
applicationReferenceMetric.setBusinessTransactionCalls(instanceReferenceMetric.getBusinessTransactionCalls());
applicationReferenceMetric.setBusinessTransactionErrorCalls(instanceReferenceMetric.getBusinessTransactionErrorCalls());
applicationReferenceMetric.setBusinessTransactionDurationSum(instanceReferenceMetric.getBusinessTransactionDurationSum());
applicationReferenceMetric.setBusinessTransactionErrorDurationSum(instanceReferenceMetric.getBusinessTransactionErrorDurationSum());
applicationReferenceMetric.setMqTransactionCalls(instanceReferenceMetric.getMqTransactionCalls());
applicationReferenceMetric.setMqTransactionErrorCalls(instanceReferenceMetric.getMqTransactionErrorCalls());
applicationReferenceMetric.setMqTransactionDurationSum(instanceReferenceMetric.getMqTransactionDurationSum());
applicationReferenceMetric.setMqTransactionErrorDurationSum(instanceReferenceMetric.getMqTransactionErrorDurationSum());
return applicationReferenceMetric;
} }
public static class Factory extends AbstractLocalAsyncWorkerProvider<ApplicationReferenceMetric, ApplicationReferenceMetric, ApplicationReferenceMetricAggregationWorker> { public static class Factory extends AbstractLocalAsyncWorkerProvider<InstanceReferenceMetric, ApplicationReferenceMetric, ApplicationReferenceMetricAggregationWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<ApplicationReferenceMetric> queueCreatorService) { public Factory(ModuleManager moduleManager,
QueueCreatorService<InstanceReferenceMetric> queueCreatorService) {
super(moduleManager, queueCreatorService); super(moduleManager, queueCreatorService);
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.application; package org.skywalking.apm.collector.agent.stream.worker.trace.application;
import org.skywalking.apm.collector.agent.stream.service.graph.ApplicationGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.StorageModule;
...@@ -37,7 +38,7 @@ public class ApplicationReferenceMetricPersistenceWorker extends PersistenceWork ...@@ -37,7 +38,7 @@ public class ApplicationReferenceMetricPersistenceWorker extends PersistenceWork
} }
@Override public int id() { @Override public int id() {
return 0; return ApplicationGraphNodeIdDefine.APPLICATION_REFERENCE_METRIC_PERSISTENCE_NODE_ID;
} }
@Override protected boolean needMergeDBData() { @Override protected boolean needMergeDBData() {
...@@ -50,7 +51,8 @@ public class ApplicationReferenceMetricPersistenceWorker extends PersistenceWork ...@@ -50,7 +51,8 @@ public class ApplicationReferenceMetricPersistenceWorker extends PersistenceWork
public static class Factory extends AbstractLocalAsyncWorkerProvider<ApplicationReferenceMetric, ApplicationReferenceMetric, ApplicationReferenceMetricPersistenceWorker> { public static class Factory extends AbstractLocalAsyncWorkerProvider<ApplicationReferenceMetric, ApplicationReferenceMetric, ApplicationReferenceMetricPersistenceWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<ApplicationReferenceMetric> queueCreatorService) { public Factory(ModuleManager moduleManager,
QueueCreatorService<ApplicationReferenceMetric> queueCreatorService) {
super(moduleManager, queueCreatorService); super(moduleManager, queueCreatorService);
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.application; package org.skywalking.apm.collector.agent.stream.worker.trace.application;
import org.skywalking.apm.collector.agent.stream.service.graph.ApplicationGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.remote.service.RemoteSenderService; import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector; import org.skywalking.apm.collector.remote.service.Selector;
...@@ -36,7 +37,7 @@ public class ApplicationReferenceMetricRemoteWorker extends AbstractRemoteWorker ...@@ -36,7 +37,7 @@ public class ApplicationReferenceMetricRemoteWorker extends AbstractRemoteWorker
} }
@Override public int id() { @Override public int id() {
return ApplicationReferenceMetricRemoteWorker.class.hashCode(); return ApplicationGraphNodeIdDefine.APPLICATION_REFERENCE_METRIC_REMOTE_NODE_ID;
} }
@Override protected void onWork(ApplicationReferenceMetric applicationReferenceMetric) throws WorkerException { @Override protected void onWork(ApplicationReferenceMetric applicationReferenceMetric) throws WorkerException {
......
...@@ -20,18 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.application; ...@@ -20,18 +20,13 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.application;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener; import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.InstanceCacheService; import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.configuration.ConfigurationModule; import org.skywalking.apm.collector.configuration.ConfigurationModule;
import org.skywalking.apm.collector.configuration.service.IApdexThresholdService; import org.skywalking.apm.collector.configuration.service.IApdexThresholdService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ApdexThresholdUtils; import org.skywalking.apm.collector.core.util.ApdexThresholdUtils;
import org.skywalking.apm.collector.core.util.CollectionUtils; import org.skywalking.apm.collector.core.util.CollectionUtils;
...@@ -44,7 +39,7 @@ import org.slf4j.LoggerFactory; ...@@ -44,7 +39,7 @@ import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ApplicationReferenceMetricSpanListener implements EntrySpanListener, ExitSpanListener, RefsListener { public class ApplicationReferenceMetricSpanListener implements EntrySpanListener, ExitSpanListener {
private final Logger logger = LoggerFactory.getLogger(ApplicationReferenceMetricSpanListener.class); private final Logger logger = LoggerFactory.getLogger(ApplicationReferenceMetricSpanListener.class);
...@@ -100,22 +95,22 @@ public class ApplicationReferenceMetricSpanListener implements EntrySpanListener ...@@ -100,22 +95,22 @@ public class ApplicationReferenceMetricSpanListener implements EntrySpanListener
} }
} }
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, // @Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId,
String segmentId) { // String segmentId) {
int parentApplicationId = instanceCacheService.get(referenceDecorator.getParentApplicationInstanceId()); // int parentApplicationId = instanceCacheService.get(referenceDecorator.getParentApplicationInstanceId());
//
ApplicationReferenceMetric referenceSum = new ApplicationReferenceMetric(Const.EMPTY_STRING); // ApplicationReferenceMetric referenceSum = new ApplicationReferenceMetric(Const.EMPTY_STRING);
referenceSum.setFrontApplicationId(parentApplicationId); // referenceSum.setFrontApplicationId(parentApplicationId);
referenceSum.setBehindApplicationId(applicationId); // referenceSum.setBehindApplicationId(applicationId);
references.add(referenceSum); // references.add(referenceSum);
} // }
@Override public void build() { @Override public void build() {
logger.debug("node reference summary listener build"); logger.debug("node reference summary listener build");
Graph<ApplicationReferenceMetric> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.APPLICATION_REFERENCE_METRIC_GRAPH_ID, ApplicationReferenceMetric.class); // Graph<ApplicationReferenceMetric> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.APPLICATION_REFERENCE_METRIC_GRAPH_ID, ApplicationReferenceMetric.class);
for (ApplicationReferenceMetric applicationReferenceMetric : applicationReferenceMetrics) { // for (ApplicationReferenceMetric applicationReferenceMetric : applicationReferenceMetrics) {
graph.start(applicationReferenceMetric); // graph.start(applicationReferenceMetric);
} // }
} }
private ApplicationReferenceMetric buildApplicationRefSum(ApplicationReferenceMetric reference, private ApplicationReferenceMetric buildApplicationRefSum(ApplicationReferenceMetric reference,
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.instance;
import org.skywalking.apm.collector.agent.stream.service.graph.InstanceGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceMetricAggregationWorker extends AggregationWorker<ServiceReferenceMetric, InstanceReferenceMetric> {
public InstanceReferenceMetricAggregationWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return InstanceGraphNodeIdDefine.INSTANCE_REFERENCE_METRIC_AGGREGATION_NODE_ID;
}
@Override protected InstanceReferenceMetric transform(ServiceReferenceMetric serviceReferenceMetric) {
String id = serviceReferenceMetric.getTimeBucket() + Const.ID_SPLIT + serviceReferenceMetric.getFrontInstanceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindInstanceId();
InstanceReferenceMetric instanceReferenceMetric = new InstanceReferenceMetric(id);
instanceReferenceMetric.setFrontInstanceId(serviceReferenceMetric.getFrontInstanceId());
instanceReferenceMetric.setBehindInstanceId(serviceReferenceMetric.getBehindInstanceId());
instanceReferenceMetric.setTransactionCalls(serviceReferenceMetric.getTransactionCalls());
instanceReferenceMetric.setTransactionErrorCalls(serviceReferenceMetric.getTransactionErrorCalls());
instanceReferenceMetric.setTransactionDurationSum(serviceReferenceMetric.getTransactionDurationSum());
instanceReferenceMetric.setTransactionErrorDurationSum(serviceReferenceMetric.getTransactionErrorDurationSum());
instanceReferenceMetric.setBusinessTransactionCalls(serviceReferenceMetric.getBusinessTransactionCalls());
instanceReferenceMetric.setBusinessTransactionErrorCalls(instanceReferenceMetric.getBusinessTransactionErrorCalls());
instanceReferenceMetric.setBusinessTransactionDurationSum(instanceReferenceMetric.getBusinessTransactionDurationSum());
instanceReferenceMetric.setBusinessTransactionErrorDurationSum(instanceReferenceMetric.getBusinessTransactionErrorDurationSum());
instanceReferenceMetric.setMqTransactionCalls(instanceReferenceMetric.getMqTransactionCalls());
instanceReferenceMetric.setMqTransactionErrorCalls(instanceReferenceMetric.getMqTransactionErrorCalls());
instanceReferenceMetric.setMqTransactionDurationSum(instanceReferenceMetric.getMqTransactionDurationSum());
instanceReferenceMetric.setMqTransactionErrorDurationSum(instanceReferenceMetric.getMqTransactionErrorDurationSum());
instanceReferenceMetric.setSourceValue(serviceReferenceMetric.getSourceValue());
return instanceReferenceMetric;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, InstanceReferenceMetric, InstanceReferenceMetricAggregationWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<ServiceReferenceMetric> queueCreatorService) {
super(moduleManager, queueCreatorService);
}
@Override public InstanceReferenceMetricAggregationWorker workerInstance(ModuleManager moduleManager) {
return new InstanceReferenceMetricAggregationWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.instance;
import org.skywalking.apm.collector.agent.stream.service.graph.InstanceGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceMetricRemoteWorker extends AbstractRemoteWorker<InstanceReferenceMetric, InstanceReferenceMetric> {
public InstanceReferenceMetricRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return InstanceGraphNodeIdDefine.INSTANCE_REFERENCE_METRIC_REMOTE_NODE_ID;
}
@Override public Selector selector() {
return Selector.HashCode;
}
@Override protected void onWork(InstanceReferenceMetric instanceReferenceMetric) throws WorkerException {
onNext(instanceReferenceMetric);
}
public static class Factory extends AbstractRemoteWorkerProvider<InstanceReferenceMetric, InstanceReferenceMetric, InstanceReferenceMetricRemoteWorker> {
public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) {
super(moduleManager, remoteSenderService, graphId);
}
@Override public InstanceReferenceMetricRemoteWorker workerInstance(ModuleManager moduleManager) {
return new InstanceReferenceMetricRemoteWorker(moduleManager);
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.instance;
import org.skywalking.apm.collector.agent.stream.service.graph.InstanceGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceReferenceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class InstanceReferencePersistenceWorker extends PersistenceWorker<InstanceReferenceMetric, InstanceReferenceMetric> {
public InstanceReferencePersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return InstanceGraphNodeIdDefine.INSTANCE_REFERENCE_METRIC_PERSISTENCE_NODE_ID;
}
@Override protected IPersistenceDAO persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IInstanceReferenceMetricPersistenceDAO.class);
}
@Override protected boolean needMergeDBData() {
return true;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstanceReferenceMetric, InstanceReferenceMetric, InstanceReferencePersistenceWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<InstanceReferenceMetric> queueCreatorService) {
super(moduleManager, queueCreatorService);
}
@Override public InstanceReferencePersistenceWorker workerInstance(ModuleManager moduleManager) {
return new InstanceReferencePersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
...@@ -21,8 +21,6 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.service; ...@@ -21,8 +21,6 @@ package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; import org.skywalking.apm.collector.cache.service.ServiceNameCacheService;
...@@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory; ...@@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener, EntrySpanListener { public class ServiceEntrySpanListener implements FirstSpanListener, EntrySpanListener {
private final Logger logger = LoggerFactory.getLogger(ServiceEntrySpanListener.class); private final Logger logger = LoggerFactory.getLogger(ServiceEntrySpanListener.class);
...@@ -61,11 +59,9 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener ...@@ -61,11 +59,9 @@ public class ServiceEntrySpanListener implements RefsListener, FirstSpanListener
this.entryServiceId = spanDecorator.getOperationNameId(); this.entryServiceId = spanDecorator.getOperationNameId();
this.entryServiceName = serviceNameCacheService.getSplitServiceName(serviceNameCacheService.get(entryServiceId)); this.entryServiceName = serviceNameCacheService.getSplitServiceName(serviceNameCacheService.get(entryServiceId));
this.hasEntry = true; this.hasEntry = true;
} if (spanDecorator.getRefsCount() > 0) {
this.hasReference = true;
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int instanceId, }
String segmentId) {
hasReference = true;
} }
@Override @Override
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.service; package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.agent.stream.service.graph.ServiceGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric; import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
...@@ -34,7 +35,7 @@ public class ServiceReferenceMetricAggregationWorker extends AggregationWorker<S ...@@ -34,7 +35,7 @@ public class ServiceReferenceMetricAggregationWorker extends AggregationWorker<S
} }
@Override public int id() { @Override public int id() {
return ServiceReferenceMetricAggregationWorker.class.hashCode(); return ServiceGraphNodeIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_NODE_ID;
} }
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetric, ServiceReferenceMetricAggregationWorker> { public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetric, ServiceReferenceMetricAggregationWorker> {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.service; package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.agent.stream.service.graph.ServiceGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule; import org.skywalking.apm.collector.storage.StorageModule;
...@@ -37,7 +38,7 @@ public class ServiceReferenceMetricPersistenceWorker extends PersistenceWorker<S ...@@ -37,7 +38,7 @@ public class ServiceReferenceMetricPersistenceWorker extends PersistenceWorker<S
} }
@Override public int id() { @Override public int id() {
return 0; return ServiceGraphNodeIdDefine.SERVICE_REFERENCE_METRIC_PERSISTENCE_NODE_ID;
} }
@Override protected boolean needMergeDBData() { @Override protected boolean needMergeDBData() {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.service; package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.agent.stream.service.graph.ServiceGraphNodeIdDefine;
import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.remote.service.RemoteSenderService; import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector; import org.skywalking.apm.collector.remote.service.Selector;
...@@ -36,7 +37,7 @@ public class ServiceReferenceMetricRemoteWorker extends AbstractRemoteWorker<Ser ...@@ -36,7 +37,7 @@ public class ServiceReferenceMetricRemoteWorker extends AbstractRemoteWorker<Ser
} }
@Override public int id() { @Override public int id() {
return ServiceReferenceMetricRemoteWorker.class.hashCode(); return ServiceGraphNodeIdDefine.SERVICE_REFERENCE_METRIC_REMOTE_NODE_ID;
} }
@Override protected void onWork(ServiceReferenceMetric serviceReferenceMetric) throws WorkerException { @Override protected void onWork(ServiceReferenceMetric serviceReferenceMetric) throws WorkerException {
......
...@@ -22,32 +22,32 @@ import java.util.LinkedList; ...@@ -22,32 +22,32 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener; import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener; import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.RefsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.agent.stream.service.trace.MetricSource;
import org.skywalking.apm.collector.core.graph.Graph; import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric; import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.skywalking.apm.network.proto.SpanLayer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ServiceReferenceMetricSpanListener implements FirstSpanListener, EntrySpanListener, RefsListener { public class ServiceReferenceMetricSpanListener implements FirstSpanListener, EntrySpanListener, ExitSpanListener {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceMetricSpanListener.class); private final Logger logger = LoggerFactory.getLogger(ServiceReferenceMetricSpanListener.class);
private List<ReferenceDecorator> referenceServices = new LinkedList<>(); private List<ServiceReferenceMetric> entryReferenceMetric = new LinkedList<>();
private int serviceId = 0; private List<ServiceReferenceMetric> exitReferenceMetric = new LinkedList<>();
private long startTime = 0; private SpanDecorator entrySpanDecorator;
private long endTime = 0;
private boolean isError = false;
private long timeBucket; private long timeBucket;
private boolean hasEntry = false;
@Override @Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId, public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
...@@ -55,80 +55,110 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En ...@@ -55,80 +55,110 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime()); timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
} }
@Override public void parseRef(ReferenceDecorator referenceDecorator, int applicationId, int applicationInstanceId,
String segmentId) {
referenceServices.add(referenceDecorator);
}
@Override @Override
public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) { String segmentId) {
serviceId = spanDecorator.getOperationNameId(); if (spanDecorator.getRefsCount() > 0) {
startTime = spanDecorator.getStartTime(); for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
endTime = spanDecorator.getEndTime(); ReferenceDecorator reference = spanDecorator.getRefs(i);
isError = spanDecorator.getIsError(); ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING);
this.hasEntry = true; serviceReferenceMetric.setEntryServiceId(reference.getEntryServiceId());
serviceReferenceMetric.setEntryInstanceId(reference.getEntryApplicationInstanceId());
serviceReferenceMetric.setFrontServiceId(reference.getParentServiceId());
serviceReferenceMetric.setFrontInstanceId(reference.getParentApplicationInstanceId());
serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId());
serviceReferenceMetric.setBehindInstanceId(instanceId);
serviceReferenceMetric.setSourceValue(MetricSource.Entry.ordinal());
calculateCost(serviceReferenceMetric, spanDecorator, true);
entryReferenceMetric.add(serviceReferenceMetric);
}
} else {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING);
serviceReferenceMetric.setEntryServiceId(spanDecorator.getOperationNameId());
serviceReferenceMetric.setEntryInstanceId(instanceId);
serviceReferenceMetric.setFrontServiceId(Const.NONE_SERVICE_ID);
serviceReferenceMetric.setFrontInstanceId(instanceId);
serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId());
serviceReferenceMetric.setBehindServiceId(instanceId);
serviceReferenceMetric.setSourceValue(MetricSource.Entry.ordinal());
calculateCost(serviceReferenceMetric, spanDecorator, false);
entryReferenceMetric.add(serviceReferenceMetric);
}
this.entrySpanDecorator = spanDecorator;
} }
private void calculateCost(ServiceReferenceMetric serviceReferenceMetric, long startTime, @Override public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
long endTime, boolean isError) { ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING);
long duration = endTime - startTime;
serviceReferenceMetric.setTransactionCalls(1L); serviceReferenceMetric.setFrontInstanceId(instanceId);
serviceReferenceMetric.setTransactionDurationSum(duration); serviceReferenceMetric.setBehindServiceId(spanDecorator.getOperationNameId());
serviceReferenceMetric.setSourceValue(MetricSource.Exit.ordinal());
calculateCost(serviceReferenceMetric, spanDecorator, true);
exitReferenceMetric.add(serviceReferenceMetric);
}
private void calculateCost(ServiceReferenceMetric serviceReferenceMetric, SpanDecorator spanDecorator,
boolean hasReference) {
long duration = spanDecorator.getStartTime() - spanDecorator.getEndTime();
if (isError) { if (spanDecorator.getIsError()) {
serviceReferenceMetric.setTransactionErrorCalls(1L); serviceReferenceMetric.setTransactionErrorCalls(1L);
serviceReferenceMetric.setTransactionErrorDurationSum(duration); serviceReferenceMetric.setTransactionErrorDurationSum(duration);
} else {
serviceReferenceMetric.setTransactionCalls(1L);
serviceReferenceMetric.setTransactionDurationSum(duration);
} }
}
@Override public void build() { if (hasReference) {
logger.debug("service reference listener build"); if (spanDecorator.getIsError()) {
if (hasEntry) { serviceReferenceMetric.setBusinessTransactionErrorCalls(1L);
if (referenceServices.size() > 0) { serviceReferenceMetric.setBusinessTransactionErrorDurationSum(duration);
referenceServices.forEach(reference -> {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING);
int entryServiceId = reference.getEntryServiceId();
int frontServiceId = reference.getParentServiceId();
int behindServiceId = serviceId;
calculateCost(serviceReferenceMetric, startTime, endTime, isError);
logger.debug("has reference, entryServiceId: {}", entryServiceId);
sendToAggregationWorker(serviceReferenceMetric, entryServiceId, frontServiceId, behindServiceId);
});
} else { } else {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING); serviceReferenceMetric.setBusinessTransactionCalls(1L);
int entryServiceId = serviceId; serviceReferenceMetric.setBusinessTransactionDurationSum(duration);
int frontServiceId = Const.NONE_SERVICE_ID; }
int behindServiceId = serviceId; }
calculateCost(serviceReferenceMetric, startTime, endTime, isError); if (SpanLayer.MQ.equals(spanDecorator.getSpanLayer())) {
sendToAggregationWorker(serviceReferenceMetric, entryServiceId, frontServiceId, behindServiceId); if (spanDecorator.getIsError()) {
serviceReferenceMetric.setMqTransactionErrorCalls(1L);
serviceReferenceMetric.setMqTransactionErrorDurationSum(duration);
} else {
serviceReferenceMetric.setMqTransactionCalls(1L);
serviceReferenceMetric.setMqTransactionDurationSum(duration);
} }
} }
} }
private void sendToAggregationWorker(ServiceReferenceMetric serviceReferenceMetric, int entryServiceId, @Override public void build() {
int frontServiceId, logger.debug("service reference listener build");
int behindServiceId) { Graph<ServiceReferenceMetric> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReferenceMetric.class);
StringBuilder idBuilder = new StringBuilder(); entryReferenceMetric.forEach(serviceReferenceMetric -> {
idBuilder.append(timeBucket).append(Const.ID_SPLIT); String id = timeBucket + Const.ID_SPLIT + serviceReferenceMetric.getEntryServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId();
idBuilder.append(entryServiceId).append(Const.ID_SPLIT); serviceReferenceMetric.setId(id);
serviceReferenceMetric.setEntryServiceId(entryServiceId); serviceReferenceMetric.setTimeBucket(timeBucket);
logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId());
idBuilder.append(frontServiceId).append(Const.ID_SPLIT); graph.start(serviceReferenceMetric);
serviceReferenceMetric.setFrontServiceId(frontServiceId); });
idBuilder.append(behindServiceId); exitReferenceMetric.forEach(serviceReferenceMetric -> {
serviceReferenceMetric.setBehindServiceId(behindServiceId); serviceReferenceMetric.setEntryInstanceId(Const.NONE_INSTANCE_ID);
if (ObjectUtils.isNotEmpty(entrySpanDecorator)) {
serviceReferenceMetric.setEntryServiceId(entrySpanDecorator.getOperationNameId());
serviceReferenceMetric.setFrontServiceId(entrySpanDecorator.getOperationNameId());
} else {
serviceReferenceMetric.setEntryServiceId(Const.NONE_SERVICE_ID);
serviceReferenceMetric.setFrontServiceId(Const.NONE_SERVICE_ID);
}
serviceReferenceMetric.setId(idBuilder.toString()); String id = timeBucket + Const.ID_SPLIT + serviceReferenceMetric.getEntryServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId();
serviceReferenceMetric.setTimeBucket(timeBucket); serviceReferenceMetric.setId(id);
logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId()); serviceReferenceMetric.setTimeBucket(timeBucket);
Graph<ServiceReferenceMetric> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SERVICE_REFERENCE_GRAPH_ID, ServiceReferenceMetric.class); graph.start(serviceReferenceMetric);
graph.start(serviceReferenceMetric); });
} }
} }
...@@ -41,5 +41,15 @@ ...@@ -41,5 +41,15 @@
<artifactId>collector-agent-stream-define</artifactId> <artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.stream.AgentStreamModule; ...@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.core.module.Module; import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageModule;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -50,6 +51,6 @@ public class AlertingModuleProvider extends ModuleProvider { ...@@ -50,6 +51,6 @@ public class AlertingModuleProvider extends ModuleProvider {
} }
@Override public String[] requiredModules() { @Override public String[] requiredModules() {
return new String[] {AgentStreamModule.NAME}; return new String[] {AgentStreamModule.NAME, StorageModule.NAME};
} }
} }
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.alerting.worker;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.table.alerting.AlertingList;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class AlertingListAggregationWorker extends AbstractLocalAsyncWorker<AlertingList, AlertingList> {
public AlertingListAggregationWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(AlertingList message) throws WorkerException {
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.alerting.worker;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IAlertingListPersistenceDAO;
import org.skywalking.apm.collector.storage.table.alerting.AlertingList;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class AlertingListPersistenceWorker extends PersistenceWorker<AlertingList, AlertingList> {
public AlertingListPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IAlertingListPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<AlertingList, AlertingList, AlertingListPersistenceWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<AlertingList> queueCreatorService) {
super(moduleManager, queueCreatorService);
}
@Override public AlertingListPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new AlertingListPersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
...@@ -30,11 +30,11 @@ ui: ...@@ -30,11 +30,11 @@ ui:
host: localhost host: localhost
port: 12800 port: 12800
context_path: / context_path: /
#storage: storage:
# elasticsearch: elasticsearch:
# cluster_name: CollectorDBCluster cluster_name: CollectorDBCluster
# cluster_transport_sniffer: true cluster_transport_sniffer: true
# cluster_nodes: localhost:9300 cluster_nodes: localhost:9300
# index_shards_number: 2 index_shards_number: 2
# index_replicas_number: 0 index_replicas_number: 0
# ttl: 7 ttl: 7
\ No newline at end of file \ No newline at end of file
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
~ Project repository: https://github.com/OpenSkywalking/skywalking ~ Project repository: https://github.com/OpenSkywalking/skywalking
--> -->
<Configuration status="info"> <Configuration status="debug">
<Appenders> <Appenders>
<Console name="Console" target="SYSTEM_OUT"> <Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/> <PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
...@@ -26,10 +26,11 @@ ...@@ -26,10 +26,11 @@
<Loggers> <Loggers>
<logger name="org.eclipse.jetty" level="INFO"/> <logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/> <logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
<logger name="org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler" level="INFO"/> <logger name="org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler" level="INFO"/>
<logger name="org.skywalking.apm.collector.stream.timer.PersistenceTimer" level="INFO"/> <logger name="org.skywalking.apm.collector.stream.timer.PersistenceTimer" level="INFO"/>
<logger name="io.grpc.netty" level="INFO"/> <logger name="io.grpc.netty" level="INFO"/>
<Root level="info"> <Root level="debug">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Root> </Root>
</Loggers> </Loggers>
......
...@@ -92,8 +92,7 @@ public class ElasticSearchClient implements Client { ...@@ -92,8 +92,7 @@ public class ElasticSearchClient implements Client {
List<AddressPairs> pairsList = new LinkedList<>(); List<AddressPairs> pairsList = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes); logger.info("elasticsearch cluster nodes: {}", nodes);
String[] nodesSplit = nodes.split(","); String[] nodesSplit = nodes.split(",");
for (int i = 0; i < nodesSplit.length; i++) { for (String node : nodesSplit) {
String node = nodesSplit[i];
String host = node.split(":")[0]; String host = node.split(":")[0];
String port = node.split(":")[1]; String port = node.split(":")[1];
pairsList.add(new AddressPairs(host, Integer.valueOf(port))); pairsList.add(new AddressPairs(host, Integer.valueOf(port)));
...@@ -106,7 +105,7 @@ public class ElasticSearchClient implements Client { ...@@ -106,7 +105,7 @@ public class ElasticSearchClient implements Client {
private String host; private String host;
private Integer port; private Integer port;
public AddressPairs(String host, Integer port) { AddressPairs(String host, Integer port) {
this.host = host; this.host = host;
this.port = port; this.port = port;
} }
......
...@@ -106,15 +106,33 @@ public abstract class Data extends EndOfBatchQueueMessage { ...@@ -106,15 +106,33 @@ public abstract class Data extends EndOfBatchQueueMessage {
} }
public Long getDataLong(int position) { public Long getDataLong(int position) {
return dataLongs[position]; if (position + 1 > dataLongs.length) {
throw new IndexOutOfBoundsException();
} else if (dataLongs[position] == null) {
return 0L;
} else {
return dataLongs[position];
}
} }
public Double getDataDouble(int position) { public Double getDataDouble(int position) {
return dataDoubles[position]; if (position + 1 > dataDoubles.length) {
throw new IndexOutOfBoundsException();
} else if (dataDoubles[position] == null) {
return 0D;
} else {
return dataDoubles[position];
}
} }
public Integer getDataInteger(int position) { public Integer getDataInteger(int position) {
return dataIntegers[position]; if (position + 1 > dataIntegers.length) {
throw new IndexOutOfBoundsException();
} else if (dataIntegers[position] == null) {
return 0;
} else {
return dataIntegers[position];
}
} }
public Boolean getDataBoolean(int position) { public Boolean getDataBoolean(int position) {
...@@ -136,27 +154,27 @@ public abstract class Data extends EndOfBatchQueueMessage { ...@@ -136,27 +154,27 @@ public abstract class Data extends EndOfBatchQueueMessage {
public void mergeData(Data newData) { public void mergeData(Data newData) {
for (int i = 0; i < stringColumns.length; i++) { for (int i = 0; i < stringColumns.length; i++) {
String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.dataStrings[i]); String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.getDataString(i));
this.dataStrings[i] = stringData; this.dataStrings[i] = stringData;
} }
for (int i = 0; i < longColumns.length; i++) { for (int i = 0; i < longColumns.length; i++) {
Long longData = longColumns[i].getOperation().operate(newData.getDataLong(i), this.dataLongs[i]); Long longData = longColumns[i].getOperation().operate(newData.getDataLong(i), this.getDataLong(i));
this.dataLongs[i] = longData; this.dataLongs[i] = longData;
} }
for (int i = 0; i < doubleColumns.length; i++) { for (int i = 0; i < doubleColumns.length; i++) {
Double doubleData = doubleColumns[i].getOperation().operate(newData.getDataDouble(i), this.dataDoubles[i]); Double doubleData = doubleColumns[i].getOperation().operate(newData.getDataDouble(i), this.getDataDouble(i));
this.dataDoubles[i] = doubleData; this.dataDoubles[i] = doubleData;
} }
for (int i = 0; i < integerColumns.length; i++) { for (int i = 0; i < integerColumns.length; i++) {
Integer integerData = integerColumns[i].getOperation().operate(newData.getDataInteger(i), this.dataIntegers[i]); Integer integerData = integerColumns[i].getOperation().operate(newData.getDataInteger(i), this.getDataInteger(i));
this.dataIntegers[i] = integerData; this.dataIntegers[i] = integerData;
} }
for (int i = 0; i < booleanColumns.length; i++) { for (int i = 0; i < booleanColumns.length; i++) {
Boolean booleanData = booleanColumns[i].getOperation().operate(newData.getDataBoolean(i), this.dataBooleans[i]); Boolean booleanData = booleanColumns[i].getOperation().operate(newData.getDataBoolean(i), this.getDataBoolean(i));
this.dataBooleans[i] = booleanData; this.dataBooleans[i] = booleanData;
} }
for (int i = 0; i < byteColumns.length; i++) { for (int i = 0; i < byteColumns.length; i++) {
byte[] byteData = byteColumns[i].getOperation().operate(newData.getDataBytes(i), this.dataBytes[i]); byte[] byteData = byteColumns[i].getOperation().operate(newData.getDataBytes(i), this.getDataBytes(i));
this.dataBytes[i] = byteData; this.dataBytes[i] = byteData;
} }
} }
......
...@@ -25,6 +25,7 @@ public class Const { ...@@ -25,6 +25,7 @@ public class Const {
public static final String ID_SPLIT = "_"; public static final String ID_SPLIT = "_";
public static final int USER_ID = 1; public static final int USER_ID = 1;
public static final int NONE_SERVICE_ID = 1; public static final int NONE_SERVICE_ID = 1;
public static final int NONE_INSTANCE_ID = 1;
public static final String NONE_SERVICE_NAME = "None"; public static final String NONE_SERVICE_NAME = "None";
public static final String USER_CODE = "User"; public static final String USER_CODE = "User";
public static final String SEGMENT_SPAN_SPLIT = "S"; public static final String SEGMENT_SPAN_SPLIT = "S";
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface IInstanceReferenceMetricPersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
...@@ -36,4 +36,5 @@ public abstract class CommonMetricTable extends CommonTable { ...@@ -36,4 +36,5 @@ public abstract class CommonMetricTable extends CommonTable {
public static final String COLUMN_MQ_TRANSACTION_ERROR_CALLS = "mq_transaction_error_calls"; public static final String COLUMN_MQ_TRANSACTION_ERROR_CALLS = "mq_transaction_error_calls";
public static final String COLUMN_MQ_TRANSACTION_DURATION_SUM = "mq_transaction_duration_sum"; public static final String COLUMN_MQ_TRANSACTION_DURATION_SUM = "mq_transaction_duration_sum";
public static final String COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM = "mq_transaction_error_duration_sum"; public static final String COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM = "mq_transaction_error_duration_sum";
public static final String COLUMN_SOURCE_VALUE = "source_value";
} }
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.instance;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceMetric extends Data {
private static final Column[] STRING_COLUMNS = {
new Column(InstanceReferenceMetricTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(InstanceReferenceMetricTable.COLUMN_TIME_BUCKET, new NonOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, new AddOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, new AddOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(InstanceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, new NonOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, new NonOperation()),
new Column(InstanceReferenceMetricTable.COLUMN_SOURCE_VALUE, new NonOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public InstanceReferenceMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public Integer getFrontInstanceId() {
return getDataInteger(0);
}
public void setFrontInstanceId(Integer frontInstanceId) {
setDataInteger(0, frontInstanceId);
}
public Integer getBehindInstanceId() {
return getDataInteger(1);
}
public void setBehindInstanceId(Integer behindInstanceId) {
setDataInteger(1, behindInstanceId);
}
public Integer getSourceValue() {
return getDataInteger(2);
}
public void setSourceValue(Integer sourceValue) {
setDataInteger(2, sourceValue);
}
public Long getTimeBucket() {
return getDataLong(0);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(0, timeBucket);
}
public Long getTransactionCalls() {
return getDataLong(1);
}
public void setTransactionCalls(Long transactionCalls) {
setDataLong(1, transactionCalls);
}
public Long getTransactionErrorCalls() {
return getDataLong(2);
}
public void setTransactionErrorCalls(Long transactionErrorCalls) {
setDataLong(2, transactionErrorCalls);
}
public Long getTransactionDurationSum() {
return getDataLong(3);
}
public void setTransactionDurationSum(Long transactionDurationSum) {
setDataLong(3, transactionDurationSum);
}
public Long getTransactionErrorDurationSum() {
return getDataLong(4);
}
public void setTransactionErrorDurationSum(Long transactionErrorDurationSum) {
setDataLong(4, transactionErrorDurationSum);
}
public Long getBusinessTransactionCalls() {
return getDataLong(5);
}
public void setBusinessTransactionCalls(Long businessTransactionCalls) {
setDataLong(5, businessTransactionCalls);
}
public Long getBusinessTransactionErrorCalls() {
return getDataLong(6);
}
public void setBusinessTransactionErrorCalls(Long businessTransactionErrorCalls) {
setDataLong(6, businessTransactionErrorCalls);
}
public Long getBusinessTransactionDurationSum() {
return getDataLong(7);
}
public void setBusinessTransactionDurationSum(Long businessTransactionDurationSum) {
setDataLong(7, businessTransactionDurationSum);
}
public Long getBusinessTransactionErrorDurationSum() {
return getDataLong(8);
}
public void setBusinessTransactionErrorDurationSum(Long businessTransactionErrorDurationSum) {
setDataLong(8, businessTransactionErrorDurationSum);
}
public Long getMqTransactionCalls() {
return getDataLong(9);
}
public void setMqTransactionCalls(Long mqTransactionCalls) {
setDataLong(9, mqTransactionCalls);
}
public Long getMqTransactionErrorCalls() {
return getDataLong(10);
}
public void setMqTransactionErrorCalls(Long mqTransactionErrorCalls) {
setDataLong(10, mqTransactionErrorCalls);
}
public Long getMqTransactionDurationSum() {
return getDataLong(11);
}
public void setMqTransactionDurationSum(Long mqTransactionDurationSum) {
setDataLong(11, mqTransactionDurationSum);
}
public Long getMqTransactionErrorDurationSum() {
return getDataLong(12);
}
public void setMqTransactionErrorDurationSum(Long mqTransactionErrorDurationSum) {
setDataLong(12, mqTransactionErrorDurationSum);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.instance;
import org.skywalking.apm.collector.storage.table.CommonMetricTable;
/**
* @author peng-yongsheng
*/
public class InstanceReferenceMetricTable extends CommonMetricTable {
public static final String TABLE = "instance_reference_metric";
public static final String COLUMN_FRONT_INSTANCE_ID = "front_instance_id";
public static final String COLUMN_BEHIND_INSTANCE_ID = "behind_instance_id";
}
...@@ -55,6 +55,12 @@ public class ServiceReferenceMetric extends Data { ...@@ -55,6 +55,12 @@ public class ServiceReferenceMetric extends Data {
new Column(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, new NonOperation()), new Column(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, new NonOperation()),
new Column(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, new NonOperation()), new Column(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, new NonOperation()),
new Column(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, new NonOperation()), new Column(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, new NonOperation()),
new Column(ServiceReferenceMetricTable.COLUMN_ENTRY_INSTANCE_ID, new NonOperation()),
new Column(ServiceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, new NonOperation()),
new Column(ServiceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, new NonOperation()),
new Column(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, new NonOperation()),
}; };
private static final Column[] BOOLEAN_COLUMNS = {}; private static final Column[] BOOLEAN_COLUMNS = {};
...@@ -89,6 +95,38 @@ public class ServiceReferenceMetric extends Data { ...@@ -89,6 +95,38 @@ public class ServiceReferenceMetric extends Data {
setDataInteger(2, behindServiceId); setDataInteger(2, behindServiceId);
} }
public Integer getEntryInstanceId() {
return getDataInteger(3);
}
public void setEntryInstanceId(Integer entryInstanceId) {
setDataInteger(3, entryInstanceId);
}
public Integer getFrontInstanceId() {
return getDataInteger(4);
}
public void setFrontInstanceId(Integer frontInstanceId) {
setDataInteger(4, frontInstanceId);
}
public Integer getBehindInstanceId() {
return getDataInteger(5);
}
public void setBehindInstanceId(Integer behindInstanceId) {
setDataInteger(5, behindInstanceId);
}
public Integer getSourceValue() {
return getDataInteger(6);
}
public void setSourceValue(Integer sourceValue) {
setDataInteger(6, sourceValue);
}
public Long getTimeBucket() { public Long getTimeBucket() {
return getDataLong(0); return getDataLong(0);
} }
......
...@@ -26,6 +26,9 @@ import org.skywalking.apm.collector.storage.table.CommonMetricTable; ...@@ -26,6 +26,9 @@ import org.skywalking.apm.collector.storage.table.CommonMetricTable;
public class ServiceReferenceMetricTable extends CommonMetricTable { public class ServiceReferenceMetricTable extends CommonMetricTable {
public static final String TABLE = "service_reference_metric"; public static final String TABLE = "service_reference_metric";
public static final String COLUMN_ENTRY_SERVICE_ID = "entry_service_id"; public static final String COLUMN_ENTRY_SERVICE_ID = "entry_service_id";
public static final String COLUMN_ENTRY_INSTANCE_ID = "entry_instance_id";
public static final String COLUMN_FRONT_SERVICE_ID = "front_service_id"; public static final String COLUMN_FRONT_SERVICE_ID = "front_service_id";
public static final String COLUMN_FRONT_INSTANCE_ID = "front_instance_id";
public static final String COLUMN_BEHIND_SERVICE_ID = "behind_service_id"; public static final String COLUMN_BEHIND_SERVICE_ID = "behind_service_id";
public static final String COLUMN_BEHIND_INSTANCE_ID = "behind_instance_id";
} }
...@@ -53,6 +53,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe ...@@ -53,6 +53,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe
serviceReferenceMetric.setEntryServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID)).intValue()); serviceReferenceMetric.setEntryServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID)).intValue());
serviceReferenceMetric.setFrontServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID)).intValue()); serviceReferenceMetric.setFrontServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReferenceMetric.setBehindServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID)).intValue()); serviceReferenceMetric.setBehindServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReferenceMetric.setSourceValue(((Number)source.get(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceMetric.setTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue()); serviceReferenceMetric.setTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
serviceReferenceMetric.setTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue()); serviceReferenceMetric.setTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue());
...@@ -81,6 +82,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe ...@@ -81,6 +82,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe
source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
...@@ -107,6 +109,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe ...@@ -107,6 +109,7 @@ public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements ISe
source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
......
...@@ -39,6 +39,7 @@ public class ServiceReferenceMetricEsTableDefine extends ElasticSearchTableDefin ...@@ -39,6 +39,7 @@ public class ServiceReferenceMetricEsTableDefine extends ElasticSearchTableDefin
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ElasticSearchColumnDefine.Type.Long.name()));
......
...@@ -58,6 +58,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe ...@@ -58,6 +58,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe
serviceReferenceMetric.setEntryServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID)); serviceReferenceMetric.setEntryServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID));
serviceReferenceMetric.setFrontServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID)); serviceReferenceMetric.setFrontServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID));
serviceReferenceMetric.setBehindServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID)); serviceReferenceMetric.setBehindServiceId(rs.getInt(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID));
serviceReferenceMetric.setSourceValue(rs.getInt(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE));
serviceReferenceMetric.setTransactionCalls(rs.getLong(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)); serviceReferenceMetric.setTransactionCalls(rs.getLong(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS));
serviceReferenceMetric.setTransactionErrorCalls(rs.getLong(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)); serviceReferenceMetric.setTransactionErrorCalls(rs.getLong(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
...@@ -91,6 +92,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe ...@@ -91,6 +92,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe
source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
...@@ -122,6 +124,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe ...@@ -122,6 +124,7 @@ public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements ISe
source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, data.getEntryServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId()); source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls()); source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
......
...@@ -36,6 +36,7 @@ public class ServiceReferenceMetricH2TableDefine extends H2TableDefine { ...@@ -36,6 +36,7 @@ public class ServiceReferenceMetricH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, H2ColumnDefine.Type.Bigint.name()));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册