提交 16611207 编写于 作者: P pengys5

Instance health web ui working with collector success.

#365
上级 0149dcf2
...@@ -10,6 +10,7 @@ import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersisten ...@@ -10,6 +10,7 @@ import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersisten
import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker; import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine; import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine; import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine;
...@@ -19,7 +20,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext; ...@@ -19,7 +20,6 @@ import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.CPU; import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream; import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC; import org.skywalking.apm.network.proto.GC;
...@@ -139,6 +139,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe ...@@ -139,6 +139,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
gcMetric.setCount(gc.getCount()); gcMetric.setCount(gc.getCount());
gcMetric.setTime(gc.getTime()); gcMetric.setTime(gc.getTime());
gcMetric.setTimeBucket(timeBucket); gcMetric.setTimeBucket(timeBucket);
gcMetric.setS5TimeBucket(TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(timeBucket));
try { try {
logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId()); logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId());
context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData()); context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData());
......
...@@ -4,17 +4,21 @@ import java.util.HashMap; ...@@ -4,17 +4,21 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable; import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data; import org.slf4j.Logger;
import org.skywalking.apm.collector.storage.define.DataDefine; import org.slf4j.LoggerFactory;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> { public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(CpuMetricEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) { @Override public Data get(String id, DataDefine dataDefine) {
return null; return null;
} }
...@@ -25,6 +29,7 @@ public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistence ...@@ -25,6 +29,7 @@ public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistence
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0)); source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0));
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0)); source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("prepare cpu metric batch insert, id: {}", data.getDataString(0));
return getClient().prepareIndex(CpuMetricTable.TABLE, data.getDataString(0)).setSource(source); return getClient().prepareIndex(CpuMetricTable.TABLE, data.getDataString(0)).setSource(source);
} }
......
...@@ -4,11 +4,11 @@ import java.util.HashMap; ...@@ -4,11 +4,11 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable; import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/** /**
* @author pengys5 * @author pengys5
...@@ -22,10 +22,11 @@ public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO, IPersistenceDA ...@@ -22,10 +22,11 @@ public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO, IPersistenceDA
@Override public IndexRequestBuilder prepareBatchInsert(Data data) { @Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>(); Map<String, Object> source = new HashMap<>();
source.put(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0)); source.put(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(0)); source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(1));
source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0)); source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0));
source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1)); source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1));
source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2)); source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2));
source.put(GCMetricTable.COLUMN_5S_TIME_BUCKET, data.getDataLong(3));
return getClient().prepareIndex(GCMetricTable.TABLE, data.getDataString(0)).setSource(source); return getClient().prepareIndex(GCMetricTable.TABLE, data.getDataString(0)).setSource(source);
} }
......
package org.skywalking.apm.collector.agentjvm.worker.gc.define; package org.skywalking.apm.collector.agentjvm.worker.gc.define;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
/** /**
* @author pengys5 * @author pengys5
...@@ -31,5 +31,6 @@ public class GCMetricEsTableDefine extends ElasticSearchTableDefine { ...@@ -31,5 +31,6 @@ public class GCMetricEsTableDefine extends ElasticSearchTableDefine {
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_COUNT, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_COUNT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_5S_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
} }
} }
...@@ -20,5 +20,6 @@ public class GCMetricH2TableDefine extends H2TableDefine { ...@@ -20,5 +20,6 @@ public class GCMetricH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_COUNT, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_COUNT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_5S_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
} }
} }
...@@ -2,6 +2,8 @@ package org.skywalking.apm.collector.agentjvm.grpc.handler; ...@@ -2,6 +2,8 @@ package org.skywalking.apm.collector.agentjvm.grpc.handler;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.network.proto.CPU; import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.GC; import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.GCPhrase; import org.skywalking.apm.network.proto.GCPhrase;
...@@ -27,17 +29,16 @@ public class JVMMetricsServiceHandlerTestCase { ...@@ -27,17 +29,16 @@ public class JVMMetricsServiceHandlerTestCase {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build(); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = JVMMetricsServiceGrpc.newBlockingStub(channel); stub = JVMMetricsServiceGrpc.newBlockingStub(channel);
final long timeInterval = 1;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> multiInstanceJvmSend(), 1, timeInterval, TimeUnit.SECONDS);
}
public static void multiInstanceJvmSend() {
buildJvmMetric(2); buildJvmMetric(2);
buildJvmMetric(3); buildJvmMetric(3);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
public static void buildJvmMetric(int instanceId) { private static void buildJvmMetric(int instanceId) {
JVMMetrics.Builder jvmMetricsBuilder = JVMMetrics.newBuilder(); JVMMetrics.Builder jvmMetricsBuilder = JVMMetrics.newBuilder();
jvmMetricsBuilder.setApplicationInstanceId(instanceId); jvmMetricsBuilder.setApplicationInstanceId(instanceId);
...@@ -88,10 +89,16 @@ public class JVMMetricsServiceHandlerTestCase { ...@@ -88,10 +89,16 @@ public class JVMMetricsServiceHandlerTestCase {
} }
private static void buildGcMetric(JVMMetric.Builder jvmMetric) { private static void buildGcMetric(JVMMetric.Builder jvmMetric) {
GC.Builder gcBuilder = GC.newBuilder(); GC.Builder newGcBuilder = GC.newBuilder();
gcBuilder.setPhrase(GCPhrase.NEW); newGcBuilder.setPhrase(GCPhrase.NEW);
gcBuilder.setCount(2); newGcBuilder.setCount(2);
gcBuilder.setTime(100); newGcBuilder.setTime(100);
jvmMetric.addGc(gcBuilder.build()); jvmMetric.addGc(newGcBuilder.build());
GC.Builder oldGcBuilder = GC.newBuilder();
oldGcBuilder.setPhrase(GCPhrase.OLD);
oldGcBuilder.setCount(2);
oldGcBuilder.setTime(100);
jvmMetric.addGc(oldGcBuilder.build());
} }
} }
...@@ -5,7 +5,7 @@ import java.util.List; ...@@ -5,7 +5,7 @@ import java.util.List;
import org.skywalking.apm.collector.storage.define.global.GlobalTraceDataDefine; import org.skywalking.apm.collector.storage.define.global.GlobalTraceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener; import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.worker.instance.performance; package org.skywalking.apm.collector.agentstream.worker.instance.performance;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.SpanObject; import org.skywalking.apm.network.proto.SpanObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -48,6 +48,7 @@ public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpan ...@@ -48,6 +48,7 @@ public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpan
instPerformance.setCallTimes(1); instPerformance.setCallTimes(1);
instPerformance.setCostTotal(cost); instPerformance.setCostTotal(cost);
instPerformance.setTimeBucket(timeBucket); instPerformance.setTimeBucket(timeBucket);
instPerformance.setS5TimeBucket(TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(timeBucket));
try { try {
logger.debug("send to instance performance persistence worker, id: {}", instPerformance.getId()); logger.debug("send to instance performance persistence worker, id: {}", instPerformance.getId());
......
...@@ -5,27 +5,33 @@ import java.util.Map; ...@@ -5,27 +5,33 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data; import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine; import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> { public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(InstPerformanceEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) { @Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get(); GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get();
if (getResponse.isExists()) { if (getResponse.isExists()) {
logger.debug("id: {} is exist", id);
Data data = dataDefine.build(id); Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource(); Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(InstPerformanceTable.COLUMN_APPLICATION_ID)); data.setDataInteger(0, (Integer)source.get(InstPerformanceTable.COLUMN_APPLICATION_ID));
data.setDataInteger(1, (Integer)source.get(InstPerformanceTable.COLUMN_INSTANCE_ID)); data.setDataInteger(1, (Integer)source.get(InstPerformanceTable.COLUMN_INSTANCE_ID));
data.setDataInteger(2, (Integer)source.get(InstPerformanceTable.COLUMN_CALL_TIMES)); data.setDataInteger(2, (Integer)source.get(InstPerformanceTable.COLUMN_CALL_TIMES));
data.setDataLong(0, (Long)source.get(InstPerformanceTable.COLUMN_COST_TOTAL)); data.setDataLong(0, ((Number)source.get(InstPerformanceTable.COLUMN_COST_TOTAL)).longValue());
data.setDataLong(1, (Long)source.get(InstPerformanceTable.COLUMN_TIME_BUCKET)); data.setDataLong(1, ((Number)source.get(InstPerformanceTable.COLUMN_TIME_BUCKET)).longValue());
data.setDataLong(2, ((Number)source.get(InstPerformanceTable.COLUMN_5S_TIME_BUCKET)).longValue());
return data; return data;
} else { } else {
return null; return null;
...@@ -39,6 +45,7 @@ public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, ...@@ -39,6 +45,7 @@ public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO,
source.put(InstPerformanceTable.COLUMN_CALL_TIMES, data.getDataInteger(2)); source.put(InstPerformanceTable.COLUMN_CALL_TIMES, data.getDataInteger(2));
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0)); source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0));
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1)); source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1));
source.put(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, data.getDataLong(2));
return getClient().prepareIndex(InstPerformanceTable.TABLE, data.getDataString(0)).setSource(source); return getClient().prepareIndex(InstPerformanceTable.TABLE, data.getDataString(0)).setSource(source);
} }
...@@ -50,6 +57,7 @@ public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, ...@@ -50,6 +57,7 @@ public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO,
source.put(InstPerformanceTable.COLUMN_CALL_TIMES, data.getDataInteger(2)); source.put(InstPerformanceTable.COLUMN_CALL_TIMES, data.getDataInteger(2));
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0)); source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0));
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1)); source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1));
source.put(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, data.getDataLong(2));
return getClient().prepareUpdate(InstPerformanceTable.TABLE, data.getDataString(0)).setDoc(source); return getClient().prepareUpdate(InstPerformanceTable.TABLE, data.getDataString(0)).setDoc(source);
} }
......
package org.skywalking.apm.collector.agentstream.worker.instance.performance.define; package org.skywalking.apm.collector.agentstream.worker.instance.performance.define;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine; import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
/** /**
* @author pengys5 * @author pengys5
...@@ -31,5 +31,6 @@ public class InstPerformanceEsTableDefine extends ElasticSearchTableDefine { ...@@ -31,5 +31,6 @@ public class InstPerformanceEsTableDefine extends ElasticSearchTableDefine {
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name())); addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
} }
} }
package org.skywalking.apm.collector.agentstream.worker.instance.performance.define; package org.skywalking.apm.collector.agentstream.worker.instance.performance.define;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine; import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine; import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
/** /**
* @author pengys5 * @author pengys5
...@@ -20,5 +20,6 @@ public class InstPerformanceH2TableDefine extends H2TableDefine { ...@@ -20,5 +20,6 @@ public class InstPerformanceH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, H2ColumnDefine.Type.Int.name())); addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name())); addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
} }
} }
...@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener; ...@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils; import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
...@@ -7,7 +7,7 @@ import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine; ...@@ -7,7 +7,7 @@ import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener; import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils; import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
...@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener; ...@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener; import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils; import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
...@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener; ...@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener; import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils; import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
...@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener ...@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener; import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostDataDefine; import org.skywalking.apm.collector.storage.define.segment.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils; import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
...@@ -6,7 +6,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener ...@@ -6,7 +6,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener; import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.storage.define.service.ServiceEntryDataDefine; import org.skywalking.apm.collector.storage.define.service.ServiceEntryDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils; import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
...@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener ...@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener; import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefDataDefine; import org.skywalking.apm.collector.storage.define.serviceref.ServiceRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils; import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper; import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext; import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
package org.skywalking.apm.collector.agentstream.mock; package org.skywalking.apm.collector.agentstream.mock;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException; import java.io.IOException;
import org.skywalking.apm.collector.agentstream.HttpClientTools; import org.skywalking.apm.collector.agentstream.HttpClientTools;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO; import org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO; import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.CollectorException; import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
/** /**
* @author pengys5 * @author pengys5
...@@ -35,14 +37,31 @@ public class SegmentPost { ...@@ -35,14 +37,31 @@ public class SegmentPost {
ApplicationDataDefine.Application providerApplication = new ApplicationDataDefine.Application("3", "dubbox-provider", 3); ApplicationDataDefine.Application providerApplication = new ApplicationDataDefine.Application("3", "dubbox-provider", 3);
applicationEsDAO.save(providerApplication); applicationEsDAO.save(providerApplication);
JsonElement consumer = JsonFileReader.INSTANCE.read("json/segment/normal/dubbox-consumer.json"); while (true) {
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString()); JsonElement consumer = JsonFileReader.INSTANCE.read("json/segment/normal/dubbox-consumer.json");
modifyTime(consumer);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
Thread.sleep(5000); JsonElement provider = JsonFileReader.INSTANCE.read("json/segment/normal/dubbox-provider.json");
modifyTime(provider);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
JsonElement provider = JsonFileReader.INSTANCE.read("json/segment/normal/dubbox-provider.json"); Thread.sleep(200);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString()); }
}
Thread.sleep(5000); private static void modifyTime(JsonElement jsonElement) {
JsonArray segmentArray = jsonElement.getAsJsonArray();
for (JsonElement element : segmentArray) {
JsonObject segmentObj = element.getAsJsonObject();
JsonArray spans = segmentObj.get("sg").getAsJsonObject().get("ss").getAsJsonArray();
for (JsonElement span : spans) {
long startTime = span.getAsJsonObject().get("st").getAsLong();
long endTime = span.getAsJsonObject().get("et").getAsLong();
long currentTime = System.currentTimeMillis();
span.getAsJsonObject().addProperty("st", currentTime);
span.getAsJsonObject().addProperty("et", currentTime + (endTime - startTime));
}
}
} }
} }
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
</layout>
</appender>
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="INFO"/>
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
\ No newline at end of file
...@@ -20,13 +20,14 @@ public abstract class StorageInstaller { ...@@ -20,13 +20,14 @@ public abstract class StorageInstaller {
defineFilter(tableDefines); defineFilter(tableDefines);
for (TableDefine tableDefine : tableDefines) { for (TableDefine tableDefine : tableDefines) {
tableDefine.initialize();
if (!isExists(client, tableDefine)) { if (!isExists(client, tableDefine)) {
logger.info("table: {} not exists", tableDefine.getName()); logger.info("table: {} not exists", tableDefine.getName());
tableDefine.initialize();
createTable(client, tableDefine); createTable(client, tableDefine);
} else { } else {
logger.info("table: {} exists", tableDefine.getName()); logger.info("table: {} exists", tableDefine.getName());
// deleteTable(client, tableDefine); deleteTable(client, tableDefine);
createTable(client, tableDefine);
} }
} }
} catch (DefineException e) { } catch (DefineException e) {
......
...@@ -21,6 +21,7 @@ public class Data extends AbstractHashMessage { ...@@ -21,6 +21,7 @@ public class Data extends AbstractHashMessage {
int booleanCapacity, int byteCapacity) { int booleanCapacity, int byteCapacity) {
super(id); super(id);
this.dataStrings = new String[stringCapacity]; this.dataStrings = new String[stringCapacity];
this.dataStrings[0] = id;
this.dataLongs = new Long[longCapacity]; this.dataLongs = new Long[longCapacity];
this.dataDoubles = new Double[doubleCapacity]; this.dataDoubles = new Double[doubleCapacity];
this.dataIntegers = new Integer[integerCapacity]; this.dataIntegers = new Integer[integerCapacity];
......
package org.skywalking.apm.collector.stream.worker.util; package org.skywalking.apm.collector.core.util;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Calendar; import java.util.Calendar;
...@@ -43,6 +43,17 @@ public enum TimeBucketUtils { ...@@ -43,6 +43,17 @@ public enum TimeBucketUtils {
return Long.valueOf(timeStr); return Long.valueOf(timeStr);
} }
public long getFiveSecondTimeBucket(long secondTimeBucket) {
long mantissa = secondTimeBucket % 10;
if (mantissa < 5) {
return (secondTimeBucket / 10) * 10;
} else if (mantissa == 5) {
return secondTimeBucket;
} else {
return ((secondTimeBucket / 10) + 1) * 10;
}
}
public long changeToUTCTimeBucket(long timeBucket) { public long changeToUTCTimeBucket(long timeBucket) {
String timeBucketStr = String.valueOf(timeBucket); String timeBucketStr = String.valueOf(timeBucket);
......
package org.skywalking.apm.collector.core.utils;
import org.junit.Assert;
import org.junit.Test;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
/**
* @author pengys5
*/
public class TimeBucketUtilsTestCase {
@Test
public void testGetFiveSecondTimeBucket() {
long fiveSecondTimeBucket = TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(20170804224812L);
Assert.assertEquals(20170804224810L, fiveSecondTimeBucket);
fiveSecondTimeBucket = TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(20170804224818L);
Assert.assertEquals(20170804224820L, fiveSecondTimeBucket);
fiveSecondTimeBucket = TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(20170804224815L);
Assert.assertEquals(20170804224815L, fiveSecondTimeBucket);
}
}
...@@ -8,4 +8,5 @@ public class CommonTable { ...@@ -8,4 +8,5 @@ public class CommonTable {
public static final String COLUMN_ID = "id"; public static final String COLUMN_ID = "id";
public static final String COLUMN_AGG = "agg"; public static final String COLUMN_AGG = "agg";
public static final String COLUMN_TIME_BUCKET = "time_bucket"; public static final String COLUMN_TIME_BUCKET = "time_bucket";
public static final String COLUMN_5S_TIME_BUCKET = "s5_time_bucket";
} }
...@@ -61,22 +61,28 @@ public abstract class DataDefine { ...@@ -61,22 +61,28 @@ public abstract class DataDefine {
for (int i = 0; i < initialCapacity(); i++) { for (int i = 0; i < initialCapacity(); i++) {
Attribute attribute = attributes[i]; Attribute attribute = attributes[i];
if (AttributeType.STRING.equals(attribute.getType())) { if (AttributeType.STRING.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataString(stringPosition), oldData.getDataString(stringPosition)); String stringData = attribute.getOperation().operate(newData.getDataString(stringPosition), oldData.getDataString(stringPosition));
newData.setDataString(stringPosition, stringData);
stringPosition++; stringPosition++;
} else if (AttributeType.LONG.equals(attribute.getType())) { } else if (AttributeType.LONG.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataLong(longPosition), oldData.getDataLong(longPosition)); Long longData = attribute.getOperation().operate(newData.getDataLong(longPosition), oldData.getDataLong(longPosition));
newData.setDataLong(longPosition, longData);
longPosition++; longPosition++;
} else if (AttributeType.DOUBLE.equals(attribute.getType())) { } else if (AttributeType.DOUBLE.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataDouble(doublePosition), oldData.getDataDouble(doublePosition)); Double doubleData = attribute.getOperation().operate(newData.getDataDouble(doublePosition), oldData.getDataDouble(doublePosition));
newData.setDataDouble(doublePosition, doubleData);
doublePosition++; doublePosition++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) { } else if (AttributeType.INTEGER.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataInteger(integerPosition), oldData.getDataInteger(integerPosition)); Integer integerData = attribute.getOperation().operate(newData.getDataInteger(integerPosition), oldData.getDataInteger(integerPosition));
newData.setDataInteger(integerPosition, integerData);
integerPosition++; integerPosition++;
} else if (AttributeType.BOOLEAN.equals(attribute.getType())) { } else if (AttributeType.BOOLEAN.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataBoolean(booleanPosition), oldData.getDataBoolean(booleanPosition)); Boolean booleanData = attribute.getOperation().operate(newData.getDataBoolean(booleanPosition), oldData.getDataBoolean(booleanPosition));
integerPosition++; newData.setDataBoolean(booleanPosition, booleanData);
booleanPosition++;
} else if (AttributeType.BYTE.equals(attribute.getType())) { } else if (AttributeType.BYTE.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataBytes(bytePosition), oldData.getDataBytes(integerPosition)); byte[] byteData = attribute.getOperation().operate(newData.getDataBytes(bytePosition), oldData.getDataBytes(integerPosition));
newData.setDataBytes(bytePosition, byteData);
bytePosition++; bytePosition++;
} }
} }
......
...@@ -16,7 +16,7 @@ import org.skywalking.apm.collector.storage.define.DataDefine; ...@@ -16,7 +16,7 @@ import org.skywalking.apm.collector.storage.define.DataDefine;
public class InstPerformanceDataDefine extends DataDefine { public class InstPerformanceDataDefine extends DataDefine {
@Override protected int initialCapacity() { @Override protected int initialCapacity() {
return 6; return 7;
} }
@Override protected void attributeDefine() { @Override protected void attributeDefine() {
...@@ -26,6 +26,7 @@ public class InstPerformanceDataDefine extends DataDefine { ...@@ -26,6 +26,7 @@ public class InstPerformanceDataDefine extends DataDefine {
addAttribute(3, new Attribute(InstPerformanceTable.COLUMN_CALL_TIMES, AttributeType.INTEGER, new AddOperation())); addAttribute(3, new Attribute(InstPerformanceTable.COLUMN_CALL_TIMES, AttributeType.INTEGER, new AddOperation()));
addAttribute(4, new Attribute(InstPerformanceTable.COLUMN_COST_TOTAL, AttributeType.LONG, new AddOperation())); addAttribute(4, new Attribute(InstPerformanceTable.COLUMN_COST_TOTAL, AttributeType.LONG, new AddOperation()));
addAttribute(5, new Attribute(InstPerformanceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation())); addAttribute(5, new Attribute(InstPerformanceTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
} }
@Override public Object deserialize(RemoteData remoteData) { @Override public Object deserialize(RemoteData remoteData) {
...@@ -43,15 +44,18 @@ public class InstPerformanceDataDefine extends DataDefine { ...@@ -43,15 +44,18 @@ public class InstPerformanceDataDefine extends DataDefine {
private int callTimes; private int callTimes;
private long costTotal; private long costTotal;
private long timeBucket; private long timeBucket;
private long s5TimeBucket;
public InstPerformance(String id, int applicationId, int instanceId, int callTimes, long costTotal, public InstPerformance(String id, int applicationId, int instanceId, int callTimes, long costTotal,
long timeBucket) { long timeBucket,
long s5TimeBucket) {
this.id = id; this.id = id;
this.applicationId = applicationId; this.applicationId = applicationId;
this.instanceId = instanceId; this.instanceId = instanceId;
this.callTimes = callTimes; this.callTimes = callTimes;
this.costTotal = costTotal; this.costTotal = costTotal;
this.timeBucket = timeBucket; this.timeBucket = timeBucket;
this.s5TimeBucket = s5TimeBucket;
} }
public InstPerformance() { public InstPerformance() {
...@@ -66,6 +70,7 @@ public class InstPerformanceDataDefine extends DataDefine { ...@@ -66,6 +70,7 @@ public class InstPerformanceDataDefine extends DataDefine {
data.setDataInteger(2, this.callTimes); data.setDataInteger(2, this.callTimes);
data.setDataLong(0, this.costTotal); data.setDataLong(0, this.costTotal);
data.setDataLong(1, this.timeBucket); data.setDataLong(1, this.timeBucket);
data.setDataLong(2, this.s5TimeBucket);
return data; return data;
} }
...@@ -76,6 +81,7 @@ public class InstPerformanceDataDefine extends DataDefine { ...@@ -76,6 +81,7 @@ public class InstPerformanceDataDefine extends DataDefine {
this.callTimes = data.getDataInteger(2); this.callTimes = data.getDataInteger(2);
this.costTotal = data.getDataLong(0); this.costTotal = data.getDataLong(0);
this.timeBucket = data.getDataLong(1); this.timeBucket = data.getDataLong(1);
this.s5TimeBucket = data.getDataLong(2);
return this; return this;
} }
...@@ -126,5 +132,13 @@ public class InstPerformanceDataDefine extends DataDefine { ...@@ -126,5 +132,13 @@ public class InstPerformanceDataDefine extends DataDefine {
public void setApplicationId(int applicationId) { public void setApplicationId(int applicationId) {
this.applicationId = applicationId; this.applicationId = applicationId;
} }
public long getS5TimeBucket() {
return s5TimeBucket;
}
public void setS5TimeBucket(long s5TimeBucket) {
this.s5TimeBucket = s5TimeBucket;
}
} }
} }
package org.skywalking.apm.collector.storage.define.jvm; package org.skywalking.apm.collector.storage.define.jvm;
import org.skywalking.apm.collector.core.framework.UnexpectedException; import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.core.stream.Data; import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.core.stream.Transform; import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation; import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation; import org.skywalking.apm.collector.core.stream.operate.NonOperation;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.storage.define.DataDefine;
/** /**
* @author pengys5 * @author pengys5
...@@ -16,7 +16,7 @@ import org.skywalking.apm.collector.core.stream.operate.NonOperation; ...@@ -16,7 +16,7 @@ import org.skywalking.apm.collector.core.stream.operate.NonOperation;
public class GCMetricDataDefine extends DataDefine { public class GCMetricDataDefine extends DataDefine {
@Override protected int initialCapacity() { @Override protected int initialCapacity() {
return 6; return 7;
} }
@Override protected void attributeDefine() { @Override protected void attributeDefine() {
...@@ -26,6 +26,7 @@ public class GCMetricDataDefine extends DataDefine { ...@@ -26,6 +26,7 @@ public class GCMetricDataDefine extends DataDefine {
addAttribute(3, new Attribute(GCMetricTable.COLUMN_COUNT, AttributeType.LONG, new CoverOperation())); addAttribute(3, new Attribute(GCMetricTable.COLUMN_COUNT, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(GCMetricTable.COLUMN_TIME, AttributeType.LONG, new CoverOperation())); addAttribute(4, new Attribute(GCMetricTable.COLUMN_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(5, new Attribute(GCMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation())); addAttribute(5, new Attribute(GCMetricTable.COLUMN_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
addAttribute(6, new Attribute(GCMetricTable.COLUMN_5S_TIME_BUCKET, AttributeType.LONG, new CoverOperation()));
} }
@Override public Object deserialize(RemoteData remoteData) { @Override public Object deserialize(RemoteData remoteData) {
...@@ -43,14 +44,17 @@ public class GCMetricDataDefine extends DataDefine { ...@@ -43,14 +44,17 @@ public class GCMetricDataDefine extends DataDefine {
private long count; private long count;
private long time; private long time;
private long timeBucket; private long timeBucket;
private long s5TimeBucket;
public GCMetric(String id, int applicationInstanceId, int phrase, long count, long time, long timeBucket) { public GCMetric(String id, int applicationInstanceId, int phrase, long count, long time, long timeBucket,
long s5TimeBucket) {
this.id = id; this.id = id;
this.applicationInstanceId = applicationInstanceId; this.applicationInstanceId = applicationInstanceId;
this.phrase = phrase; this.phrase = phrase;
this.count = count; this.count = count;
this.time = time; this.time = time;
this.timeBucket = timeBucket; this.timeBucket = timeBucket;
this.s5TimeBucket = s5TimeBucket;
} }
public GCMetric() { public GCMetric() {
...@@ -65,6 +69,7 @@ public class GCMetricDataDefine extends DataDefine { ...@@ -65,6 +69,7 @@ public class GCMetricDataDefine extends DataDefine {
data.setDataLong(0, this.count); data.setDataLong(0, this.count);
data.setDataLong(1, this.time); data.setDataLong(1, this.time);
data.setDataLong(2, this.timeBucket); data.setDataLong(2, this.timeBucket);
data.setDataLong(3, this.s5TimeBucket);
return data; return data;
} }
...@@ -75,6 +80,7 @@ public class GCMetricDataDefine extends DataDefine { ...@@ -75,6 +80,7 @@ public class GCMetricDataDefine extends DataDefine {
this.count = data.getDataLong(0); this.count = data.getDataLong(0);
this.time = data.getDataLong(1); this.time = data.getDataLong(1);
this.timeBucket = data.getDataLong(2); this.timeBucket = data.getDataLong(2);
this.s5TimeBucket = data.getDataLong(3);
return this; return this;
} }
...@@ -125,5 +131,13 @@ public class GCMetricDataDefine extends DataDefine { ...@@ -125,5 +131,13 @@ public class GCMetricDataDefine extends DataDefine {
public void setTime(long time) { public void setTime(long time) {
this.time = time; this.time = time;
} }
public long getS5TimeBucket() {
return s5TimeBucket;
}
public void setS5TimeBucket(long s5TimeBucket) {
this.s5TimeBucket = s5TimeBucket;
}
} }
} }
...@@ -4,6 +4,7 @@ import java.util.ArrayList; ...@@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand; import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker; import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext; import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
...@@ -11,7 +12,6 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException; ...@@ -11,7 +12,6 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role; import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException; import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache; import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -75,12 +75,24 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker { ...@@ -75,12 +75,24 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
Data dbData = persistenceDAO().get(id, getRole().dataDefine()); Data dbData = persistenceDAO().get(id, getRole().dataDefine());
if (ObjectUtils.isNotEmpty(dbData)) { if (ObjectUtils.isNotEmpty(dbData)) {
getRole().dataDefine().mergeData(data, dbData); getRole().dataDefine().mergeData(data, dbData);
updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(data)); try {
updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} else { } else {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data)); try {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} }
} else { } else {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data)); try {
insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
} }
}); });
......
...@@ -4,7 +4,7 @@ import java.util.Calendar; ...@@ -4,7 +4,7 @@ import java.util.Calendar;
import java.util.TimeZone; import java.util.TimeZone;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils;
/** /**
* @author pengys5 * @author pengys5
......
...@@ -24,4 +24,14 @@ public class ApplicationCache { ...@@ -24,4 +24,14 @@ public class ApplicationCache {
return Const.EXCEPTION; return Const.EXCEPTION;
} }
} }
public static String getForUI(int applicationId) {
String applicationCode = get(applicationId);
if (applicationCode.equals("Unknown")) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
applicationCode = dao.getApplicationCode(applicationId);
CACHE.put(applicationId, applicationCode);
}
return applicationCode;
}
} }
...@@ -12,40 +12,51 @@ import org.elasticsearch.action.search.SearchType; ...@@ -12,40 +12,51 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable; import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable; import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.network.proto.GCPhrase; import org.skywalking.apm.network.proto.GCPhrase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author pengys5 * @author pengys5
*/ */
public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO { public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO {
@Override public GCCount getGCCount(long timestamp, int instanceId) { private final Logger logger = LoggerFactory.getLogger(GCMetricEsDAO.class);
@Override public GCCount getGCCount(long s5TimeBucket, int instanceId) {
logger.debug("get gc count, s5TimeBucket: {}, instanceId: {}", s5TimeBucket, instanceId);
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(GCMetricTable.TABLE); SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(GCMetricTable.TABLE);
searchRequestBuilder.setTypes(GCMetricTable.TABLE_TYPE); searchRequestBuilder.setTypes(GCMetricTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
MatchQueryBuilder matchApplicationId = QueryBuilders.matchQuery(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, instanceId); MatchQueryBuilder matchApplicationId = QueryBuilders.matchQuery(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, instanceId);
MatchQueryBuilder matchTimeBucket = QueryBuilders.matchQuery(GCMetricTable.COLUMN_TIME_BUCKET, timestamp); MatchQueryBuilder matchTimeBucket = QueryBuilders.matchQuery(GCMetricTable.COLUMN_5S_TIME_BUCKET, s5TimeBucket);
boolQuery.must().add(matchApplicationId); boolQuery.must().add(matchApplicationId);
boolQuery.must().add(matchTimeBucket); boolQuery.must().add(matchTimeBucket);
searchRequestBuilder.setQuery(boolQuery); searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(100); searchRequestBuilder.setSize(0);
searchRequestBuilder.addAggregation(
AggregationBuilders.terms(GCMetricTable.COLUMN_PHRASE).field(GCMetricTable.COLUMN_PHRASE)
.subAggregation(AggregationBuilders.sum(GCMetricTable.COLUMN_COUNT).field(GCMetricTable.COLUMN_COUNT)));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
SearchHit[] searchHits = searchResponse.getHits().getHits();
GCCount gcCount = new GCCount(); GCCount gcCount = new GCCount();
for (SearchHit searchHit : searchHits) { Terms phraseAggregation = searchResponse.getAggregations().get(GCMetricTable.COLUMN_PHRASE);
int phrase = (Integer)searchHit.getSource().get(GCMetricTable.COLUMN_PHRASE); for (Terms.Bucket phraseBucket : phraseAggregation.getBuckets()) {
int count = (Integer)searchHit.getSource().get(GCMetricTable.COLUMN_COUNT); int phrase = phraseBucket.getKeyAsNumber().intValue();
Sum sumAggregation = phraseBucket.getAggregations().get(GCMetricTable.COLUMN_COUNT);
int count = (int)sumAggregation.getValue();
if (phrase == GCPhrase.NEW_VALUE) { if (phrase == GCPhrase.NEW_VALUE) {
gcCount.setYoung(count); gcCount.setYoung(count);
......
package org.skywalking.apm.collector.ui.dao; package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO; import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/** /**
...@@ -10,4 +11,12 @@ public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO { ...@@ -10,4 +11,12 @@ public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO {
@Override public GCCount getGCCount(long timestamp, int instanceId) { @Override public GCCount getGCCount(long timestamp, int instanceId) {
return null; return null;
} }
@Override public JsonObject getMetric(int instanceId, long timeBucket) {
return null;
}
@Override public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
return null;
}
} }
...@@ -7,7 +7,7 @@ import java.util.List; ...@@ -7,7 +7,7 @@ import java.util.List;
* @author pengys5 * @author pengys5
*/ */
public interface IInstPerformanceDAO { public interface IInstPerformanceDAO {
List<InstPerformance> getMultiple(long timestamp, int applicationId); List<InstPerformance> getMultiple(long timeBucket, int applicationId);
int getMetric(int instanceId, long timeBucket); int getMetric(int instanceId, long timeBucket);
......
...@@ -13,7 +13,9 @@ import org.elasticsearch.action.search.SearchType; ...@@ -13,7 +13,9 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable; import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable; import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
...@@ -24,30 +26,43 @@ import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO; ...@@ -24,30 +26,43 @@ import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
*/ */
public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO { public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO {
@Override public List<InstPerformance> getMultiple(long timestamp, int applicationId) { @Override public List<InstPerformance> getMultiple(long timeBucket, int applicationId) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(InstPerformanceTable.TABLE); SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(InstPerformanceTable.TABLE);
searchRequestBuilder.setTypes(InstPerformanceTable.TABLE_TYPE); searchRequestBuilder.setTypes(InstPerformanceTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
MatchQueryBuilder matchApplicationId = QueryBuilders.matchQuery(InstPerformanceTable.COLUMN_APPLICATION_ID, applicationId); MatchQueryBuilder matchApplicationId = QueryBuilders.matchQuery(InstPerformanceTable.COLUMN_APPLICATION_ID, applicationId);
MatchQueryBuilder matchTimeBucket = QueryBuilders.matchQuery(InstPerformanceTable.COLUMN_TIME_BUCKET, timestamp); MatchQueryBuilder matchTimeBucket = QueryBuilders.matchQuery(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, timeBucket);
boolQuery.must().add(matchApplicationId); boolQuery.must().add(matchApplicationId);
boolQuery.must().add(matchTimeBucket); boolQuery.must().add(matchTimeBucket);
searchRequestBuilder.setQuery(boolQuery); searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(100); searchRequestBuilder.setSize(0);
searchRequestBuilder.addAggregation(
AggregationBuilders.terms(InstPerformanceTable.COLUMN_INSTANCE_ID).field(InstPerformanceTable.COLUMN_INSTANCE_ID)
.subAggregation(
AggregationBuilders.terms(InstPerformanceTable.COLUMN_5S_TIME_BUCKET).field(InstPerformanceTable.COLUMN_5S_TIME_BUCKET)
.subAggregation(AggregationBuilders.sum(InstPerformanceTable.COLUMN_CALL_TIMES).field(InstPerformanceTable.COLUMN_CALL_TIMES))
.subAggregation(AggregationBuilders.sum(InstPerformanceTable.COLUMN_COST_TOTAL).field(InstPerformanceTable.COLUMN_COST_TOTAL))));
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet(); SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
List<InstPerformance> instPerformances = new LinkedList<>();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
int instanceId = (Integer)searchHit.getSource().get(InstPerformanceTable.COLUMN_INSTANCE_ID);
int callTimes = (Integer)searchHit.getSource().get(InstPerformanceTable.COLUMN_CALL_TIMES);
long costTotal = ((Number)searchHit.getSource().get(InstPerformanceTable.COLUMN_COST_TOTAL)).longValue();
instPerformances.add(new InstPerformance(instanceId, callTimes, costTotal)); Terms instanceTerms = searchResponse.getAggregations().get(InstPerformanceTable.COLUMN_INSTANCE_ID);
List<InstPerformance> instPerformances = new LinkedList<>();
for (Terms.Bucket instanceBucket : instanceTerms.getBuckets()) {
int instanceId = instanceBucket.getKeyAsNumber().intValue();
Terms timeBucketTerms = instanceBucket.getAggregations().get(InstPerformanceTable.COLUMN_5S_TIME_BUCKET);
for (Terms.Bucket timeBucketBucket : timeBucketTerms.getBuckets()) {
long count = timeBucketBucket.getDocCount();
Sum sumCallTimes = timeBucketBucket.getAggregations().get(InstPerformanceTable.COLUMN_CALL_TIMES);
Sum sumCostTotal = timeBucketBucket.getAggregations().get(InstPerformanceTable.COLUMN_COST_TOTAL);
int avgCallTimes = (int)(sumCallTimes.getValue() / count);
int avgCost = (int)(sumCostTotal.getValue() / count);
instPerformances.add(new InstPerformance(instanceId, avgCallTimes, avgCost));
}
} }
return instPerformances; return instPerformances;
......
package org.skywalking.apm.collector.ui.dao; package org.skywalking.apm.collector.ui.dao;
import com.google.gson.JsonArray;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO; import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
...@@ -11,4 +12,12 @@ public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO { ...@@ -11,4 +12,12 @@ public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO {
@Override public List<InstPerformance> getMultiple(long timestamp, int applicationId) { @Override public List<InstPerformance> getMultiple(long timestamp, int applicationId) {
return null; return null;
} }
@Override public int getMetric(int instanceId, long timeBucket) {
return 0;
}
@Override public JsonArray getMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
return null;
}
} }
package org.skywalking.apm.collector.ui.dao; package org.skywalking.apm.collector.ui.dao;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO; import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.storage.table.instance.Instance;
/** /**
* @author pengys5 * @author pengys5
...@@ -20,7 +20,7 @@ public class InstanceH2DAO extends H2DAO implements IInstanceDAO { ...@@ -20,7 +20,7 @@ public class InstanceH2DAO extends H2DAO implements IInstanceDAO {
return null; return null;
} }
@Override public Instance getInstance(int instanceId) { @Override public InstanceDataDefine.Instance getInstance(int instanceId) {
return null; return null;
} }
} }
...@@ -22,17 +22,17 @@ public class ApplicationsGetHandler extends JettyHandler { ...@@ -22,17 +22,17 @@ public class ApplicationsGetHandler extends JettyHandler {
private InstanceHealthService service = new InstanceHealthService(); private InstanceHealthService service = new InstanceHealthService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
String timestamp = req.getParameter("timestamp"); String timestampStr = req.getParameter("timestamp");
logger.debug("instance health applications get time: {}", timestamp); logger.debug("instance health applications get time: {}", timestampStr);
long time; long timestamp;
try { try {
time = Long.parseLong(timestamp); timestamp = Long.parseLong(timestampStr);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new ArgumentsParseException("time must be long"); throw new ArgumentsParseException("time must be long");
} }
return service.getApplications(time); return service.getApplications(timestamp);
} }
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
......
package org.skywalking.apm.collector.ui.jetty.handler.instancehealth; package org.skywalking.apm.collector.ui.jetty.handler.instancehealth;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler; import org.skywalking.apm.collector.server.jetty.JettyHandler;
...@@ -23,8 +25,8 @@ public class InstanceHealthGetHandler extends JettyHandler { ...@@ -23,8 +25,8 @@ public class InstanceHealthGetHandler extends JettyHandler {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
String timestampStr = req.getParameter("timestamp"); String timestampStr = req.getParameter("timestamp");
String applicationIdStr = req.getParameter("applicationId"); String[] applicationIdsStr = req.getParameterValues("applicationIds");
logger.debug("instance health get timestamp: {}", timestampStr); logger.debug("instance health get timestamp: {}, applicationIdsStr: {}", timestampStr, applicationIdsStr);
long timestamp; long timestamp;
try { try {
...@@ -33,14 +35,24 @@ public class InstanceHealthGetHandler extends JettyHandler { ...@@ -33,14 +35,24 @@ public class InstanceHealthGetHandler extends JettyHandler {
throw new ArgumentsParseException("timestamp must be long"); throw new ArgumentsParseException("timestamp must be long");
} }
int applicationId; int[] applicationIds = new int[applicationIdsStr.length];
try { for (int i = 0; i < applicationIdsStr.length; i++) {
applicationId = Integer.parseInt(applicationIdStr); try {
} catch (NumberFormatException e) { applicationIds[i] = Integer.parseInt(applicationIdsStr[i]);
throw new ArgumentsParseException("application id must be integer"); } catch (NumberFormatException e) {
throw new ArgumentsParseException("application id must be integer");
}
} }
return service.getInstances(timestamp, applicationId); JsonObject response = new JsonObject();
response.addProperty("timestamp", timestamp);
JsonArray appInstances = new JsonArray();
response.add("appInstances", appInstances);
for (int applicationId : applicationIds) {
appInstances.add(service.getInstances(timestamp, applicationId));
}
return response;
} }
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
......
...@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.ui.jetty.handler.time; ...@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.ui.jetty.handler.time;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import java.util.Calendar;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler; import org.skywalking.apm.collector.server.jetty.JettyHandler;
...@@ -25,8 +26,15 @@ public class AllInstanceLastTimeGetHandler extends JettyHandler { ...@@ -25,8 +26,15 @@ public class AllInstanceLastTimeGetHandler extends JettyHandler {
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
Long time = service.allInstanceLastTime(); Long time = service.allInstanceLastTime();
logger.debug("all instance last time: {}", time); logger.debug("all instance last time: {}", time);
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
int second = calendar.get(Calendar.SECOND);
second = (second % 5) * 5;
calendar.set(Calendar.SECOND, second);
JsonObject timeJson = new JsonObject(); JsonObject timeJson = new JsonObject();
timeJson.addProperty("time", time); timeJson.addProperty("time", calendar.getTimeInMillis());
return timeJson; return timeJson;
} }
......
...@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.ui.service; ...@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.ui.service;
import com.google.gson.JsonArray; import com.google.gson.JsonArray;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import java.util.List; import java.util.List;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer; import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.ui.cache.ApplicationCache; import org.skywalking.apm.collector.ui.cache.ApplicationCache;
import org.skywalking.apm.collector.ui.dao.IGCMetricDAO; import org.skywalking.apm.collector.ui.dao.IGCMetricDAO;
...@@ -18,14 +19,14 @@ public class InstanceHealthService { ...@@ -18,14 +19,14 @@ public class InstanceHealthService {
private final Logger logger = LoggerFactory.getLogger(InstanceHealthService.class); private final Logger logger = LoggerFactory.getLogger(InstanceHealthService.class);
public JsonObject getApplications(long timestamp) { public JsonObject getApplications(long timeBucket) {
IInstanceDAO instanceDAO = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName()); IInstanceDAO instanceDAO = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
List<IInstanceDAO.Application> applications = instanceDAO.getApplications(timestamp); List<IInstanceDAO.Application> applications = instanceDAO.getApplications(timeBucket);
JsonObject response = new JsonObject(); JsonObject response = new JsonObject();
JsonArray applicationArray = new JsonArray(); JsonArray applicationArray = new JsonArray();
response.addProperty("timestamp", timestamp); response.addProperty("timeBucket", timeBucket);
response.add("applicationList", applicationArray); response.add("applicationList", applicationArray);
applications.forEach(application -> { applications.forEach(application -> {
...@@ -43,15 +44,16 @@ public class InstanceHealthService { ...@@ -43,15 +44,16 @@ public class InstanceHealthService {
public JsonObject getInstances(long timestamp, int applicationId) { public JsonObject getInstances(long timestamp, int applicationId) {
JsonObject response = new JsonObject(); JsonObject response = new JsonObject();
IInstPerformanceDAO instPerformanceDAO = (IInstPerformanceDAO)DAOContainer.INSTANCE.get(IInstPerformanceDAO.class.getName()); long secondTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(timestamp);
List<IInstPerformanceDAO.InstPerformance> performances = instPerformanceDAO.getMultiple(timestamp, applicationId); long s5TimeBucket = TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(secondTimeBucket);
response.addProperty("timestamp", timestamp); IInstPerformanceDAO instPerformanceDAO = (IInstPerformanceDAO)DAOContainer.INSTANCE.get(IInstPerformanceDAO.class.getName());
List<IInstPerformanceDAO.InstPerformance> performances = instPerformanceDAO.getMultiple(s5TimeBucket, applicationId);
JsonArray instances = new JsonArray(); JsonArray instances = new JsonArray();
response.addProperty("applicationCode", ApplicationCache.get(applicationId)); response.addProperty("applicationCode", ApplicationCache.getForUI(applicationId));
response.addProperty("applicationId", applicationId); response.addProperty("applicationId", applicationId);
response.add("appInstances", instances); response.add("instances", instances);
IGCMetricDAO gcMetricDAO = (IGCMetricDAO)DAOContainer.INSTANCE.get(IGCMetricDAO.class.getName()); IGCMetricDAO gcMetricDAO = (IGCMetricDAO)DAOContainer.INSTANCE.get(IGCMetricDAO.class.getName());
performances.forEach(instance -> { performances.forEach(instance -> {
...@@ -59,14 +61,14 @@ public class InstanceHealthService { ...@@ -59,14 +61,14 @@ public class InstanceHealthService {
instanceJson.addProperty("id", instance.getInstanceId()); instanceJson.addProperty("id", instance.getInstanceId());
instanceJson.addProperty("tps", instance.getCallTimes()); instanceJson.addProperty("tps", instance.getCallTimes());
int avg = (int)((instance.getCostTotal() / instance.getCallTimes()) / 1000); int avg = (int)(instance.getCostTotal() / instance.getCallTimes());
instanceJson.addProperty("avg", avg); instanceJson.addProperty("avg", avg);
if (avg > 5) { if (avg > 5000) {
instanceJson.addProperty("healthLevel", 0); instanceJson.addProperty("healthLevel", 0);
} else if (avg > 3 && avg <= 5) { } else if (avg > 3000 && avg <= 5000) {
instanceJson.addProperty("healthLevel", 1); instanceJson.addProperty("healthLevel", 1);
} else if (avg > 1 && avg <= 3) { } else if (avg > 1000 && avg <= 3000) {
instanceJson.addProperty("healthLevel", 2); instanceJson.addProperty("healthLevel", 2);
} else { } else {
instanceJson.addProperty("healthLevel", 3); instanceJson.addProperty("healthLevel", 3);
...@@ -74,7 +76,7 @@ public class InstanceHealthService { ...@@ -74,7 +76,7 @@ public class InstanceHealthService {
instanceJson.addProperty("status", 0); instanceJson.addProperty("status", 0);
IGCMetricDAO.GCCount gcCount = gcMetricDAO.getGCCount(timestamp, instance.getInstanceId()); IGCMetricDAO.GCCount gcCount = gcMetricDAO.getGCCount(s5TimeBucket, instance.getInstanceId());
instanceJson.addProperty("ygc", gcCount.getYoung()); instanceJson.addProperty("ygc", gcCount.getYoung());
instanceJson.addProperty("ogc", gcCount.getOld()); instanceJson.addProperty("ogc", gcCount.getOld());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册