提交 8e8029d0 编写于 作者: P peng-yongsheng

no message

上级 7c0f47c1
......@@ -20,24 +20,17 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
......@@ -55,81 +48,77 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);
private final Graph memoryMetricGraph;
private final Graph memoryPoolMetricGraph;
private final Graph gcMetricGraph;
private final Graph cpuMetricGraph;
private final Graph heartBeatGraph;
public JVMMetricsServiceHandler() {
memoryPoolMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.MEMORY_POOL_METRIC_GRAPH_ID);
memoryMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.MEMORY_METRIC_GRAPH_ID);
gcMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID);
cpuMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID);
heartBeatGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.INST_HEART_BEAT_GRAPH_ID);
}
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(context, instanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(context, instanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, instanceId, time, metric.getMemoryPoolList());
sendToGCMetricPersistenceWorker(context, instanceId, time, metric.getGcList());
senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList());
sendToGCMetricPersistenceWorker(instanceId, time, metric.getGcList());
});
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context, int instanceId,
long heartBeatTime) {
InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
heartBeat.setId(String.valueOf(instanceId));
heartBeat.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
heartBeat.setInstanceId(instanceId);
try {
logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
Instance instance = new Instance(String.valueOf(instanceId));
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
instance.setInstanceId(instanceId);
logger.debug("send to instance heart beat persistence worker, id: {}", instance.getId());
heartBeatGraph.start(instance);
}
private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, CPU cpu) {
CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric();
cpuMetric.setId(timeBucket + Const.ID_SPLIT + instanceId);
private void sendToCpuMetricPersistenceWorker(int instanceId, long timeBucket, CPU cpu) {
CpuMetric cpuMetric = new CpuMetric(timeBucket + Const.ID_SPLIT + instanceId);
cpuMetric.setInstanceId(instanceId);
cpuMetric.setUsagePercent(cpu.getUsagePercent());
cpuMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to cpu metric persistence worker, id: {}", cpuMetric.getId());
context.getClusterWorkerContext().lookup(CpuMetricPersistenceWorker.WorkerRole.INSTANCE).tell(cpuMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, List<Memory> memories) {
logger.debug("send to cpu metric graph, id: {}", cpuMetric.getId());
cpuMetricGraph.start(cpuMetric);
}
private void sendToMemoryMetricPersistenceWorker(int instanceId, long timeBucket, List<Memory> memories) {
memories.forEach(memory -> {
MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric();
memoryMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap()));
memoryMetric.setApplicationInstanceId(instanceId);
memoryMetric.setHeap(memory.getIsHeap());
MemoryMetric memoryMetric = new MemoryMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap()));
memoryMetric.setInstanceId(instanceId);
memoryMetric.setIsHeap(memory.getIsHeap());
memoryMetric.setInit(memory.getInit());
memoryMetric.setMax(memory.getMax());
memoryMetric.setUsed(memory.getUsed());
memoryMetric.setCommitted(memory.getCommitted());
memoryMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory metric persistence worker, id: {}", memoryMetric.getId());
context.getClusterWorkerContext().lookup(MemoryMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
logger.debug("send to memory metric graph, id: {}", memoryMetric.getId());
memoryMetricGraph.start(memoryMetric);
});
}
private void sendToMemoryPoolMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, List<MemoryPool> memoryPools) {
private void sendToMemoryPoolMetricPersistenceWorker(int instanceId, long timeBucket,
List<MemoryPool> memoryPools) {
memoryPools.forEach(memoryPool -> {
MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric();
memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
memoryPoolMetric.setInstanceId(instanceId);
memoryPoolMetric.setPoolType(memoryPool.getType().getNumber());
memoryPoolMetric.setInit(memoryPool.getInit());
......@@ -137,31 +126,23 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
memoryPoolMetric.setUsed(memoryPool.getUsed());
memoryPoolMetric.setCommitted(memoryPool.getCommited());
memoryPoolMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory pool metric persistence worker, id: {}", memoryPoolMetric.getId());
context.getClusterWorkerContext().lookup(MemoryPoolMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryPoolMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
logger.debug("send to memory pool metric graph, id: {}", memoryPoolMetric.getId());
memoryPoolMetricGraph.start(memoryPoolMetric);
});
}
private void sendToGCMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, List<GC> gcs) {
private void sendToGCMetricPersistenceWorker(int instanceId, long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> {
GCMetricDataDefine.GCMetric gcMetric = new GCMetricDataDefine.GCMetric();
gcMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue()));
GCMetric gcMetric = new GCMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue()));
gcMetric.setInstanceId(instanceId);
gcMetric.setPhrase(gc.getPhraseValue());
gcMetric.setCount(gc.getCount());
gcMetric.setTime(gc.getTime());
gcMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId());
context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
logger.debug("send to gc metric graph, id: {}", gcMetric.getId());
gcMetricGraph.start(gcMetric);
});
}
}
......@@ -33,7 +33,7 @@
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-define</artifactId>
<artifactId>collector-stream-provider</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.base.WorkerRef;
/**
* @author peng-yongsheng
*/
public class JvmMetricStreamGraph {
public static final int GC_METRIC_GRAPH_ID = 100;
public static final int MEMORY_METRIC_GRAPH_ID = 101;
public static final int MEMORY_POOL_METRIC_GRAPH_ID = 102;
public static final int CPU_METRIC_GRAPH_ID = 103;
public static final int INST_HEART_BEAT_GRAPH_ID = 104;
public Graph<GCMetric> createGCMetricGraph() {
Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class);
graph.addNode(new GCMetricPersistenceWorker());
return graph;
}
public Graph<CpuMetric> createCPUMetricGraph() throws ProviderNotFoundException {
CpuMetricPersistenceWorker.Factory factory = new CpuMetricPersistenceWorker.Factory(null, null);
final WorkerRef workerRef = factory.create(null);
Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class);
graph.addNode(new NodeProcessor<CpuMetric, CpuMetric>() {
@Override public int id() {
return 0;
}
@Override public void process(CpuMetric INPUT, Next<CpuMetric> next) {
try {
workerRef.tell(INPUT);
} catch (WorkerInvokeException e) {
e.printStackTrace();
}
}
});
return graph;
}
public Graph<MemoryMetric> createMemoryMetricGraph() {
Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
graph.addNode(new MemoryMetricPersistenceWorker());
return graph;
}
public Graph<MemoryPoolMetric> createMemoryPoolMetricGraph() {
Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
graph.addNode(new MemoryPoolMetricPersistenceWorker());
return graph;
}
public Graph<Instance> createHeartBeatGraph() {
Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class);
graph.addNode(new InstHeartBeatPersistenceWorker());
return graph;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class CpuMetricPersistenceWorker extends PersistenceWorker {
private final DAOService daoService;
public CpuMetricPersistenceWorker(DAOService daoService) {
super(daoService);
this.daoService = daoService;
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return daoService.getPersistenceDAO(ICpuMetricStreamDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<CpuMetricPersistenceWorker> {
public Factory(DAOService daoService, QueueCreatorService queueCreatorService) {
super(daoService, queueCreatorService);
}
@Override
public CpuMetricPersistenceWorker workerInstance(DAOService daoService) {
return new CpuMetricPersistenceWorker(daoService);
}
@Override
public int queueSize() {
return 1024;
}
}
}
......@@ -16,20 +16,22 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.graph;
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
/**
* @author peng-yongsheng
*/
public class GCMetricStreamGraph {
public class GCMetricPersistenceWorker implements NodeProcessor<GCMetric, GCMetric> {
@Override public int id() {
return 1;
}
@Override public void process(GCMetric INPUT, Next<GCMetric> next) {
public Graph<GCMetric> createIfAbsent() {
Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(1, GCMetric.class);
graph.addNode();
return graph;
}
}
......@@ -16,24 +16,22 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.table.register.Instance;
/**
* @author peng-yongsheng
*/
public class ClusterWorkerContext extends WorkerContext {
public class InstHeartBeatPersistenceWorker implements NodeProcessor<Instance, Instance> {
private List<AbstractRemoteWorkerProvider> providers = new ArrayList<>();
public List<AbstractRemoteWorkerProvider> getProviders() {
return providers;
@Override public int id() {
return 0;
}
@Override
public void putProvider(AbstractRemoteWorkerProvider provider) {
providers.add(provider);
@Override public void process(Instance INPUT, Next<Instance> next) {
}
}
......@@ -16,16 +16,22 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.remote.RoutingRule;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
/**
* @author peng-yongsheng
*/
public interface Role {
public class MemoryMetricPersistenceWorker implements NodeProcessor<MemoryMetric, MemoryMetric> {
String roleName();
@Override public int id() {
return 0;
}
RoutingRule routingRule();
@Override public void process(MemoryMetric INPUT, Next<MemoryMetric> next) {
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
/**
* @author peng-yongsheng
*/
public class MemoryPoolMetricPersistenceWorker implements NodeProcessor<MemoryPoolMetric, MemoryPoolMetric> {
@Override public int id() {
return 0;
}
@Override public void process(MemoryPoolMetric INPUT, Next<MemoryPoolMetric> next) {
}
}
......@@ -28,6 +28,12 @@ public abstract class Data extends AbstractHashMessage {
private Integer[] dataIntegers;
private Boolean[] dataBooleans;
private byte[][] dataBytes;
private final Column[] stringColumns;
private final Column[] longColumns;
private final Column[] doubleColumns;
private final Column[] integerColumns;
private final Column[] booleanColumns;
private final Column[] byteColumns;
public Data(String id, Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
Column[] integerColumns, Column[] booleanColumns, Column[] byteColumns) {
......@@ -39,6 +45,12 @@ public abstract class Data extends AbstractHashMessage {
this.dataIntegers = new Integer[integerColumns.length];
this.dataBooleans = new Boolean[booleanColumns.length];
this.dataBytes = new byte[byteColumns.length][];
this.stringColumns = stringColumns;
this.longColumns = longColumns;
this.doubleColumns = doubleColumns;
this.integerColumns = integerColumns;
this.booleanColumns = booleanColumns;
this.byteColumns = byteColumns;
}
public int getDataStringsCount() {
......@@ -117,6 +129,33 @@ public abstract class Data extends AbstractHashMessage {
return dataStrings[0];
}
public void mergeData(Data newData) {
for (int i = 0; i < stringColumns.length; i++) {
String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.dataStrings[i]);
this.dataStrings[i] = stringData;
}
for (int i = 0; i < longColumns.length; i++) {
Long longData = longColumns[i].getOperation().operate(newData.getDataLong(i), this.dataLongs[i]);
this.dataLongs[i] = longData;
}
for (int i = 0; i < doubleColumns.length; i++) {
Double doubleData = doubleColumns[i].getOperation().operate(newData.getDataDouble(i), this.dataDoubles[i]);
this.dataDoubles[i] = doubleData;
}
for (int i = 0; i < integerColumns.length; i++) {
Integer integerData = integerColumns[i].getOperation().operate(newData.getDataInteger(i), this.dataIntegers[i]);
this.dataIntegers[i] = integerData;
}
for (int i = 0; i < booleanColumns.length; i++) {
Boolean booleanData = booleanColumns[i].getOperation().operate(newData.getDataBoolean(i), this.dataBooleans[i]);
this.dataBooleans[i] = booleanData;
}
for (int i = 0; i < byteColumns.length; i++) {
byte[] byteData = byteColumns[i].getOperation().operate(newData.getDataBytes(i), this.dataBytes[i]);
this.dataBytes[i] = byteData;
}
}
@Override public String toString() {
StringBuilder dataStr = new StringBuilder();
dataStr.append("string: [");
......
......@@ -18,9 +18,10 @@
package org.skywalking.apm.collector.queue.base;
import org.skywalking.apm.collector.core.framework.Executor;
/**
* @author peng-yongsheng
*/
public interface QueueExecutor {
void execute(Object message);
public interface QueueExecutor extends Executor {
}
......@@ -23,6 +23,6 @@ import java.util.List;
/**
* @author peng-yongsheng
*/
public interface IBatchDAO {
public interface IBatchDAO extends DAO {
void batchPersistence(List<?> batchCollection);
}
......@@ -18,13 +18,15 @@
package org.skywalking.apm.collector.storage.base.dao;
import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public interface IPersistenceDAO<Insert, Update, Data> {
Data get(String id);
public interface IPersistenceDAO<Insert, Update, DataImpl extends Data> {
DataImpl get(String id);
Insert prepareBatchInsert(Data data);
Insert prepareBatchInsert(DataImpl data);
Update prepareBatchUpdate(Data data);
Update prepareBatchUpdate(DataImpl data);
}
......@@ -18,8 +18,10 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface ICpuMetricStreamDAO {
public interface ICpuMetricStreamDAO<Insert, Update, Data> extends IPersistenceDAO<Insert, Update, Data> {
}
......@@ -20,10 +20,13 @@ package org.skywalking.apm.collector.storage.service;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.collector.storage.base.dao.DAO;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface DAOService extends Service {
DAO get(Class<? extends DAO> daoInterfaceClass);
IPersistenceDAO getPersistenceDAO(Class<? extends IPersistenceDAO> daoInterfaceClass);
}
......@@ -56,11 +56,23 @@ public class CpuMetric extends Data {
return getDataInteger(0);
}
public void setInstanceId(Integer instanceId) {
setDataInteger(0, instanceId);
}
public Double getUsagePercent() {
return getDataDouble(0);
}
public void setUsagePercent(Double usagePercent) {
setDataDouble(0, usagePercent);
}
public Long getTimeBucket() {
return getDataLong(0);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(0, timeBucket);
}
}
......@@ -57,19 +57,39 @@ public class GCMetric extends Data {
return getDataLong(0);
}
public void setCount(Long count) {
setDataLong(0, count);
}
public Long getTime() {
return getDataLong(1);
}
public void setTime(Long time) {
setDataLong(1, time);
}
public Long getTimeBucket() {
return getDataLong(2);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(2, timeBucket);
}
public Integer getInstanceId() {
return getDataInteger(0);
}
public void setInstanceId(Integer instanceId) {
setDataInteger(0, instanceId);
}
public Integer getPhrase() {
return getDataInteger(1);
}
public void setPhrase(Integer phrase) {
setDataInteger(1, phrase);
}
}
......@@ -60,27 +60,55 @@ public class MemoryMetric extends Data {
return getDataLong(0);
}
public void setInit(Long init) {
setDataLong(0, init);
}
public Long getMax() {
return getDataLong(1);
}
public void setMax(Long max) {
setDataLong(1, max);
}
public Long getUsed() {
return getDataLong(2);
}
public void setUsed(Long used) {
setDataLong(2, used);
}
public Long getCommitted() {
return getDataLong(3);
}
public void setCommitted(Long committed) {
setDataLong(3, committed);
}
public Long getTimeBucket() {
return getDataLong(4);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(4, timeBucket);
}
public Boolean getIsHeap() {
return getDataBoolean(0);
}
public void setIsHeap(Boolean isHeap) {
setDataBoolean(0, isHeap);
}
public Integer getInstanceId() {
return getDataInteger(0);
}
public void setInstanceId(Integer instanceId) {
setDataInteger(0, instanceId);
}
}
......@@ -59,27 +59,55 @@ public class MemoryPoolMetric extends Data {
return getDataLong(0);
}
public void setInit(Long init) {
setDataLong(0, init);
}
public Long getMax() {
return getDataLong(1);
}
public void setMax(Long max) {
setDataLong(1, max);
}
public Long getUsed() {
return getDataLong(2);
}
public void setUsed(Long used) {
setDataLong(2, used);
}
public Long getCommitted() {
return getDataLong(3);
}
public void setCommitted(Long committed) {
setDataLong(3, committed);
}
public Long getTimeBucket() {
return getDataLong(4);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(4, timeBucket);
}
public Integer getInstanceId() {
return getDataInteger(0);
}
public void setInstanceId(Integer instanceId) {
setDataInteger(0, instanceId);
}
public Integer getPoolType() {
return getDataInteger(1);
}
public void setPoolType(Integer poolType) {
setDataInteger(1, poolType);
}
}
......@@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ICpuMetricStreamDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
......@@ -33,7 +32,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class CpuMetricEsStreamDAO extends EsDAO implements ICpuMetricStreamDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, CpuMetric> {
public class CpuMetricEsStreamDAO extends EsDAO implements ICpuMetricStreamDAO<IndexRequestBuilder, UpdateRequestBuilder, CpuMetric> {
private final Logger logger = LoggerFactory.getLogger(CpuMetricEsStreamDAO.class);
......
......@@ -27,38 +27,7 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor;
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractLocalAsyncWorker extends AbstractWorker<LocalAsyncWorkerRef> implements QueueExecutor {
private LocalAsyncWorkerRef workerRef;
/**
* Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
*
* @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use
* to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
*/
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
/**
* The asynchronous worker always use to persistence data into db, this is the end of the streaming,
* so usually no need to create the next worker instance at the time of this worker instance create.
*
* @throws ProviderNotFoundException When worker provider not found, it will be throw this exception.
*/
@Override
public void preStart() throws ProviderNotFoundException {
}
@Override protected final LocalAsyncWorkerRef getSelf() {
return workerRef;
}
@Override protected final void putSelfRef(LocalAsyncWorkerRef workerRef) {
this.workerRef = workerRef;
}
public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor {
/**
* Receive message
......
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
......@@ -29,24 +30,19 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
public abstract int queueSize();
private final DAOService daoService;
private final QueueCreatorService queueCreatorService;
public AbstractLocalAsyncWorkerProvider(QueueCreatorService queueCreatorService) {
public AbstractLocalAsyncWorkerProvider(DAOService daoService, QueueCreatorService queueCreatorService) {
this.daoService = daoService;
this.queueCreatorService = queueCreatorService;
}
@Override
final public WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException {
T localAsyncWorker = workerInstance(getClusterContext());
localAsyncWorker.preStart();
T localAsyncWorker = workerInstance(daoService);
workerCreateListener.addWorker(localAsyncWorker);
QueueEventHandler queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), queueEventHandler);
getClusterContext().put(workerRef);
localAsyncWorker.putSelfRef(workerRef);
return workerRef;
return new LocalAsyncWorkerRef(queueEventHandler);
}
}
......@@ -18,25 +18,17 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.framework.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorker<S extends WorkerRef> {
public abstract class AbstractWorker implements Executor {
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
private final Role role;
private final ClusterWorkerContext clusterContext;
public AbstractWorker(Role role, ClusterWorkerContext clusterContext) {
this.role = role;
this.clusterContext = clusterContext;
}
/**
* The data process logic in this method.
*
......@@ -45,17 +37,11 @@ public abstract class AbstractWorker<S extends WorkerRef> {
*/
protected abstract void onWork(Object message) throws WorkerException;
public abstract void preStart() throws ProviderNotFoundException;
final public ClusterWorkerContext getClusterContext() {
return clusterContext;
@Override public final void execute(Object message) {
try {
onWork(message);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
}
final public Role getRole() {
return role;
}
protected abstract S getSelf();
protected abstract void putSelfRef(S workerRef);
}
......@@ -18,22 +18,11 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorkerProvider<T extends AbstractWorker> implements Provider {
private ClusterWorkerContext clusterContext;
public abstract Role role();
public abstract T workerInstance(ClusterWorkerContext clusterContext);
final public void setClusterContext(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
final protected ClusterWorkerContext getClusterContext() {
return clusterContext;
}
public abstract T workerInstance(DAOService daoService);
}
......@@ -27,8 +27,7 @@ public class LocalAsyncWorkerRef extends WorkerRef {
private QueueEventHandler queueEventHandler;
public LocalAsyncWorkerRef(Role role, QueueEventHandler queueEventHandler) {
super(role);
public LocalAsyncWorkerRef(QueueEventHandler queueEventHandler) {
this.queueEventHandler = queueEventHandler;
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.worker.base;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class WorkerContext implements Context {
private final Logger logger = LoggerFactory.getLogger(WorkerContext.class);
private Map<String, RemoteWorkerRef> remoteWorkerRefs;
private Map<String, List<WorkerRef>> roleWorkers;
private Map<String, Role> roles;
WorkerContext() {
this.roleWorkers = new HashMap<>();
this.roles = new HashMap<>();
this.remoteWorkerRefs = new HashMap<>();
}
private Map<String, List<WorkerRef>> getRoleWorkers() {
return this.roleWorkers;
}
@Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
if (getRoleWorkers().containsKey(role.roleName())) {
return new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
} else {
throw new WorkerNotFoundException("role=" + role.roleName() + ", no available worker.");
}
}
@Override final public RemoteWorkerRef lookupInSide(String roleName) throws WorkerNotFoundException {
if (remoteWorkerRefs.containsKey(roleName)) {
return remoteWorkerRefs.get(roleName);
} else {
throw new WorkerNotFoundException("role=" + roleName + ", no available worker.");
}
}
public final void putRole(Role role) {
roles.put(role.roleName(), role);
}
public final Role getRole(String roleName) {
return roles.get(roleName);
}
@Override final public void put(WorkerRef workerRef) {
logger.debug("put worker reference into context, role name: {}", workerRef.getRole().roleName());
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<>());
}
getRoleWorkers().get(workerRef.getRole().roleName()).add(workerRef);
if (workerRef instanceof RemoteWorkerRef) {
RemoteWorkerRef remoteWorkerRef = (RemoteWorkerRef)workerRef;
if (!remoteWorkerRef.isAcrossJVM()) {
remoteWorkerRefs.put(workerRef.getRole().roleName(), remoteWorkerRef);
}
}
}
@Override final public void remove(WorkerRef workerRef) {
getRoleWorkers().remove(workerRef.getRole().roleName());
}
}
......@@ -22,15 +22,5 @@ package org.skywalking.apm.collector.stream.worker.base;
* @author peng-yongsheng
*/
public abstract class WorkerRef {
private Role role;
public WorkerRef(Role role) {
this.role = role;
}
final public Role getRole() {
return role;
}
public abstract void tell(Object message) throws WorkerInvokeException;
}
......@@ -24,13 +24,10 @@ import java.util.Map;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.queue.base.EndOfBatchCommand;
import org.skywalking.apm.collector.storage.base.dao.DAOContainer;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.Role;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
......@@ -43,15 +40,12 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
private DataCache dataCache;
private final DAOService daoService;
private final DataCache dataCache;
public PersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
public PersistenceWorker(DAOService daoService) {
this.dataCache = new DataCache();
this.daoService = daoService;
}
@Override protected final void onWork(Object message) throws WorkerException {
......@@ -71,7 +65,7 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
dataCache.switchPointer();
List<?> collection = buildBatchCollection();
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class);
dao.batchPersistence(collection);
}
} finally {
......@@ -107,9 +101,9 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
List<Object> updateBatchCollection = new LinkedList<>();
dataMap.forEach((id, data) -> {
if (needMergeDBData()) {
Data dbData = persistenceDAO().get(id, getRole().dataDefine());
Data dbData = persistenceDAO().get(id);
if (ObjectUtils.isNotEmpty(dbData)) {
getRole().dataDefine().mergeData(data, dbData);
dbData.mergeData(data);
try {
updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(data));
} catch (Throwable t) {
......@@ -137,12 +131,12 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
private void aggregate(Object message) {
dataCache.writing();
Data data = (Data)message;
Data newData = (Data)message;
if (dataCache.containsKey(data.getId())) {
getRole().dataDefine().mergeData(dataCache.get(data.getId()), data);
if (dataCache.containsKey(newData.getId())) {
dataCache.get(newData.getId()).mergeData(newData);
} else {
dataCache.put(data.getId(), data);
dataCache.put(newData.getId(), newData);
}
dataCache.finishWriting();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册