提交 c5166159 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge branch 'master' into wu-sheng-3.2-readme

......@@ -78,14 +78,34 @@ public class JVMMetricsServiceHandlerTestCase {
}
private static void buildMemoryPoolMetric(JVMMetric.Builder jvmMetric) {
MemoryPool.Builder builder_1 = MemoryPool.newBuilder();
builder_1.setType(PoolType.NEWGEN_USAGE);
builder_1.setIsHeap(true);
builder_1.setInit(20);
builder_1.setMax(100);
builder_1.setUsed(50);
builder_1.setCommited(30);
jvmMetric.addMemoryPool(builder_1.build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.NEWGEN_USAGE, true).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.NEWGEN_USAGE, false).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.OLDGEN_USAGE, true).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.OLDGEN_USAGE, false).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.METASPACE_USAGE, true).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.METASPACE_USAGE, false).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.PERMGEN_USAGE, true).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.PERMGEN_USAGE, false).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.SURVIVOR_USAGE, true).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.SURVIVOR_USAGE, false).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.CODE_CACHE_USAGE, true).build());
jvmMetric.addMemoryPool(buildMemoryPoolMetric(PoolType.CODE_CACHE_USAGE, false).build());
}
private static MemoryPool.Builder buildMemoryPoolMetric(PoolType poolType, boolean isHeap) {
MemoryPool.Builder builder = MemoryPool.newBuilder();
builder.setType(poolType);
builder.setIsHeap(isHeap);
builder.setInit(20);
builder.setMax(100);
builder.setUsed(50);
builder.setCommited(30);
return builder;
}
private static void buildGcMetric(JVMMetric.Builder jvmMetric) {
......
package org.skywalking.apm.collector.agentregister.instance;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -28,7 +28,7 @@ public class InstanceIDService {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance("0", applicationId, agentUUID, registerTime, 0, registerTime, osInfo);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(instance);
context.getClusterWorkerContext().lookup(InstanceRegisterRemoteWorker.WorkerRole.INSTANCE).tell(instance);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
......
......@@ -10,16 +10,26 @@ import org.skywalking.apm.collector.storage.dao.DAOContainer;
*/
public class ApplicationCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
public static int get(String applicationCode) {
int applicationId = 0;
try {
return CACHE.get(applicationCode, () -> {
applicationId = CACHE.get(applicationCode, () -> {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationId(applicationCode);
});
} catch (Throwable e) {
return 0;
return applicationId;
}
if (applicationId == 0) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
applicationId = dao.getApplicationId(applicationCode);
if (applicationId != 0) {
CACHE.put(applicationCode, applicationId);
}
}
return applicationId;
}
}
......@@ -10,7 +10,7 @@ import org.skywalking.apm.collector.storage.dao.DAOContainer;
*/
public class InstanceCache {
private static Cache<Integer, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(1000).build();
private static Cache<Integer, Integer> CACHE = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
public static int get(int applicationInstanceId) {
try {
......
......@@ -11,7 +11,7 @@ import org.skywalking.apm.collector.storage.dao.DAOContainer;
*/
public class ServiceCache {
private static Cache<Integer, String> CACHE = CacheBuilder.newBuilder().maximumSize(10000).build();
private static Cache<Integer, String> CACHE = CacheBuilder.newBuilder().initialCapacity(1000).maximumSize(20000).build();
public static String getServiceName(int serviceId) {
try {
......
......@@ -14,7 +14,7 @@ public class GlobalTraceEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 2;
return 5;
}
@Override public int numberOfShards() {
......
......@@ -3,6 +3,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
......@@ -10,7 +11,6 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
......@@ -47,8 +47,8 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker {
} else {
int max = dao.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
serviceName.setApplicationId(serviceId);
serviceName.setId(String.valueOf(serviceId));
serviceName.setServiceId(serviceId);
}
dao.save(serviceName);
}
......
......@@ -30,7 +30,7 @@ public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ServiceNameTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_APPLICATION_ID, applicationId));
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostTable;
/**
* @author pengys5
......@@ -14,7 +14,7 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 2;
return 5;
}
@Override public int numberOfShards() {
......@@ -27,7 +27,7 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Text.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_COST, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_START_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_END_TIME, ElasticSearchColumnDefine.Type.Long.name()));
......
......@@ -28,9 +28,9 @@ public class SegmentPost {
InstanceEsDAO instanceEsDAO = new InstanceEsDAO();
instanceEsDAO.setClient(client);
InstanceDataDefine.Instance consumerInstance = new InstanceDataDefine.Instance("2", 2, "dubbox-consumer", now, 2, now, "");
InstanceDataDefine.Instance consumerInstance = new InstanceDataDefine.Instance("2", 2, "dubbox-consumer", now, 2, now, osInfo("consumer").toString());
instanceEsDAO.save(consumerInstance);
InstanceDataDefine.Instance providerInstance = new InstanceDataDefine.Instance("3", 3, "dubbox-provider", now, 3, now, "");
InstanceDataDefine.Instance providerInstance = new InstanceDataDefine.Instance("3", 3, "dubbox-provider", now, 3, now, osInfo("provider").toString());
instanceEsDAO.save(providerInstance);
ApplicationEsDAO applicationEsDAO = new ApplicationEsDAO();
......@@ -64,10 +64,13 @@ public class SegmentPost {
modifyTime(provider);
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
diff = 0;
Thread.sleep(1000);
}
}
private static long diff = 0;
private static void modifyTime(JsonElement jsonElement) {
JsonArray segmentArray = jsonElement.getAsJsonArray();
for (JsonElement element : segmentArray) {
......@@ -76,10 +79,28 @@ public class SegmentPost {
for (JsonElement span : spans) {
long startTime = span.getAsJsonObject().get("st").getAsLong();
long endTime = span.getAsJsonObject().get("et").getAsLong();
long currentTime = System.currentTimeMillis();
span.getAsJsonObject().addProperty("st", currentTime);
span.getAsJsonObject().addProperty("et", currentTime + (endTime - startTime));
if (diff == 0) {
diff = System.currentTimeMillis() - startTime;
}
span.getAsJsonObject().addProperty("st", startTime + diff);
span.getAsJsonObject().addProperty("et", endTime + diff);
}
}
}
private static JsonObject osInfo(String hostName) {
JsonObject osInfoJson = new JsonObject();
osInfoJson.addProperty("osName", "Linux");
osInfoJson.addProperty("hostName", hostName);
osInfoJson.addProperty("processId", 1);
JsonArray ipv4Array = new JsonArray();
ipv4Array.add("123.123.123.123");
ipv4Array.add("124.124.124.124");
osInfoJson.add("ipv4s", ipv4Array);
return osInfoJson;
}
}
package org.skywalking.apm.collector.agentstream.mock.grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithStringValue;
import org.skywalking.apm.network.proto.LogMessage;
import org.skywalking.apm.network.proto.OSInfo;
import org.skywalking.apm.network.proto.RefType;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.SpanLayer;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentReference;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GrpcSegmentPost {
private final Logger logger = LoggerFactory.getLogger(GrpcSegmentPost.class);
private AtomicLong sequence = new AtomicLong(1);
@Test
public void init() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).maxInboundMessageSize(1024 * 1024 * 50).usePlaintext(true).build();
int consumerApplicationId = 0;
int providerApplicationId = 0;
int consumerInstanceId = 0;
int providerInstanceId = 0;
int consumerEntryServiceId = 0;
int consumerExitServiceId = 0;
int consumerExitApplicationId = 0;
int providerEntryServiceId = 0;
while (consumerApplicationId == 0) {
consumerApplicationId = registerApplication(channel, "consumer");
}
while (consumerExitApplicationId == 0) {
consumerExitApplicationId = registerApplication(channel, "172.25.0.4:20880");
}
while (providerApplicationId == 0) {
providerApplicationId = registerApplication(channel, "provider");
}
while (consumerInstanceId == 0) {
consumerInstanceId = registerInstanceId(channel, "ConsumerUUID", consumerApplicationId, "consumer_host_name", 1);
}
while (providerInstanceId == 0) {
providerInstanceId = registerInstanceId(channel, "ProviderUUID", providerApplicationId, "provider_host_name", 2);
}
while (consumerEntryServiceId == 0) {
consumerEntryServiceId = registerServiceId(channel, consumerApplicationId, "/dubbox-case/case/dubbox-rest");
}
while (consumerExitServiceId == 0) {
consumerExitServiceId = registerServiceId(channel, consumerApplicationId, "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
}
while (providerEntryServiceId == 0) {
providerEntryServiceId = registerServiceId(channel, providerApplicationId, "org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
}
Ids ids = new Ids();
ids.setConsumerApplicationId(consumerApplicationId);
ids.setProviderApplicationId(providerApplicationId);
ids.setConsumerInstanceId(consumerInstanceId);
ids.setProviderInstanceId(providerInstanceId);
ids.setConsumerEntryServiceId(consumerEntryServiceId);
ids.setConsumerExitServiceId(consumerExitServiceId);
ids.setConsumerExitApplicationId(consumerExitApplicationId);
long startTime = TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis());
logger.info("start time: {}", startTime);
int count = 10;
ThreadCount threadCount = new ThreadCount(count);
for (int i = 0; i < count; i++) {
Status status = new Status();
BuildNewSegment buildNewSegment = new BuildNewSegment(channel, ids, threadCount, i, status);
Executors.newSingleThreadExecutor().execute(buildNewSegment);
}
while (threadCount.getCount() != 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
long endTime = TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis());
logger.info("end time: {}", endTime);
channel.shutdownNow();
while (!channel.isTerminated()) {
try {
channel.awaitTermination(100, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
private int registerApplication(ManagedChannel channel, String applicationCode) {
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode(applicationCode).build();
ApplicationMapping mapping = stub.register(application);
int applicationId = mapping.getApplication(0).getValue();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return applicationId;
}
private int registerInstanceId(ManagedChannel channel, String agentUUId, Integer applicationId,
String hostName, int processNo) {
InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub stub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
ApplicationInstance.Builder instance = ApplicationInstance.newBuilder();
instance.setApplicationId(applicationId);
instance.setRegisterTime(System.currentTimeMillis());
instance.setAgentUUID(agentUUId);
OSInfo.Builder osInfo = OSInfo.newBuilder();
osInfo.setHostname(hostName);
osInfo.setOsName("Linux");
osInfo.setProcessNo(processNo);
osInfo.addIpv4S("10.0.0.1");
osInfo.addIpv4S("10.0.0.2");
instance.setOsinfo(osInfo.build());
ApplicationInstanceMapping mapping = stub.register(instance.build());
int instanceId = mapping.getApplicationInstanceId();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return instanceId;
}
private int registerServiceId(ManagedChannel channel, int applicationId, String serviceName) {
ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub stub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
ServiceNameCollection.Builder collection = ServiceNameCollection.newBuilder();
ServiceNameElement.Builder element = ServiceNameElement.newBuilder();
element.setApplicationId(applicationId);
element.setServiceName(serviceName);
collection.addElements(element);
ServiceNameMappingCollection mappingCollection = stub.discovery(collection.build());
int serviceId = mappingCollection.getElements(0).getServiceId();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
return serviceId;
}
class BuildNewSegment implements Runnable {
private final ManagedChannel segmentChannel;
private final Ids ids;
private final ThreadCount threadCount;
private final int procNo;
private final Status status;
private StreamObserver<UpstreamSegment> streamObserver;
public BuildNewSegment(ManagedChannel segmentChannel,
Ids ids, ThreadCount threadCount, int procNo,
Status status) {
this.segmentChannel = segmentChannel;
this.ids = ids;
this.threadCount = threadCount;
this.procNo = procNo;
this.status = status;
}
@Override public void run() {
statusChange();
int i = 0;
while (i < 50000) {
send(streamObserver, ids);
i++;
if (i % 10000 == 0) {
logger.info("process no: {}, send segment count: {}", procNo, i);
streamObserver.onCompleted();
while (!status.isFinish) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
status.setFinish(false);
statusChange();
}
}
this.threadCount.finishOne();
}
private void statusChange() {
TraceSegmentServiceGrpc.TraceSegmentServiceStub stub = TraceSegmentServiceGrpc.newStub(segmentChannel);
streamObserver = stub.collect(new StreamObserver<Downstream>() {
@Override public void onNext(Downstream downstream) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
status.setFinish(true);
logger.info("process no: {}, server completed", procNo);
}
});
}
}
public void send(StreamObserver<UpstreamSegment> streamObserver, Ids ids) {
long now = System.currentTimeMillis();
UniqueId consumerSegmentId = createSegmentId();
UniqueId providerSegmentId = createSegmentId();
streamObserver.onNext(createConsumerSegment(consumerSegmentId, ids, now));
streamObserver.onNext(createProviderSegment(consumerSegmentId, providerSegmentId, ids, now));
}
private UpstreamSegment createConsumerSegment(UniqueId segmentId, Ids ids, long timestamp) {
UpstreamSegment.Builder upstream = UpstreamSegment.newBuilder();
upstream.addGlobalTraceIds(segmentId);
TraceSegmentObject.Builder segmentBuilder = TraceSegmentObject.newBuilder();
segmentBuilder.setApplicationId(ids.consumerApplicationId);
segmentBuilder.setApplicationInstanceId(ids.consumerInstanceId);
segmentBuilder.setTraceSegmentId(segmentId);
SpanObject.Builder entrySpan = SpanObject.newBuilder();
entrySpan.setSpanId(0);
entrySpan.setSpanType(SpanType.Entry);
entrySpan.setSpanLayer(SpanLayer.Http);
entrySpan.setParentSpanId(-1);
entrySpan.setStartTime(timestamp);
entrySpan.setEndTime(timestamp + 3000);
entrySpan.setComponentId(ComponentsDefine.TOMCAT.getId());
entrySpan.setOperationNameId(ids.getConsumerEntryServiceId());
entrySpan.setIsError(false);
LogMessage.Builder entryLogMessage = LogMessage.newBuilder();
entryLogMessage.setTime(timestamp);
KeyWithStringValue.Builder data_1 = KeyWithStringValue.newBuilder();
data_1.setKey("url");
data_1.setValue("http://localhost:18080/dubbox-case/case/dubbox-rest");
entryLogMessage.addData(data_1);
KeyWithStringValue.Builder data_2 = KeyWithStringValue.newBuilder();
data_2.setKey("http.method");
data_2.setValue("GET");
entryLogMessage.addData(data_2);
entrySpan.addLogs(entryLogMessage);
segmentBuilder.addSpans(entrySpan);
SpanObject.Builder exitSpan = SpanObject.newBuilder();
exitSpan.setSpanId(1);
exitSpan.setSpanType(SpanType.Exit);
exitSpan.setSpanLayer(SpanLayer.RPCFramework);
exitSpan.setParentSpanId(0);
exitSpan.setStartTime(timestamp + 500);
exitSpan.setEndTime(timestamp + 2500);
exitSpan.setComponentId(ComponentsDefine.TOMCAT.getId());
exitSpan.setOperationNameId(ids.getConsumerExitServiceId());
exitSpan.setPeerId(ids.consumerExitApplicationId);
exitSpan.setIsError(false);
LogMessage.Builder exitLogMessage = LogMessage.newBuilder();
exitLogMessage.setTime(timestamp);
KeyWithStringValue.Builder data = KeyWithStringValue.newBuilder();
data.setKey("url");
data.setValue("rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
exitLogMessage.addData(data);
exitSpan.addLogs(exitLogMessage);
segmentBuilder.addSpans(exitSpan);
upstream.setSegment(segmentBuilder.build().toByteString());
return upstream.build();
}
private UpstreamSegment createProviderSegment(UniqueId consumerSegmentId, UniqueId providerSegmentId, Ids ids,
long timestamp) {
UpstreamSegment.Builder upstream = UpstreamSegment.newBuilder();
upstream.addGlobalTraceIds(consumerSegmentId);
TraceSegmentObject.Builder segmentBuilder = TraceSegmentObject.newBuilder();
segmentBuilder.setApplicationId(ids.providerApplicationId);
segmentBuilder.setApplicationInstanceId(ids.providerInstanceId);
segmentBuilder.setTraceSegmentId(providerSegmentId);
TraceSegmentReference.Builder referenceBuilder = TraceSegmentReference.newBuilder();
referenceBuilder.setParentTraceSegmentId(consumerSegmentId);
referenceBuilder.setParentApplicationInstanceId(ids.getConsumerInstanceId());
referenceBuilder.setParentSpanId(1);
referenceBuilder.setParentServiceId(ids.getConsumerExitServiceId());
referenceBuilder.setEntryApplicationInstanceId(ids.getConsumerInstanceId());
referenceBuilder.setEntryServiceId(ids.getConsumerEntryServiceId());
referenceBuilder.setNetworkAddressId(ids.consumerExitApplicationId);
referenceBuilder.setRefType(RefType.CrossProcess);
segmentBuilder.addRefs(referenceBuilder);
SpanObject.Builder entrySpan = SpanObject.newBuilder();
entrySpan.setSpanId(0);
entrySpan.setSpanType(SpanType.Entry);
entrySpan.setSpanLayer(SpanLayer.RPCFramework);
entrySpan.setParentSpanId(-1);
entrySpan.setStartTime(timestamp + 1000);
entrySpan.setEndTime(timestamp + 2000);
entrySpan.setComponentId(ComponentsDefine.TOMCAT.getId());
entrySpan.setOperationNameId(ids.getProviderEntryServiceId());
entrySpan.setIsError(false);
LogMessage.Builder entryLogMessage = LogMessage.newBuilder();
entryLogMessage.setTime(timestamp);
KeyWithStringValue.Builder data_1 = KeyWithStringValue.newBuilder();
data_1.setKey("url");
data_1.setValue("rest://172.25.0.4:20880/org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
entryLogMessage.addData(data_1);
KeyWithStringValue.Builder data_2 = KeyWithStringValue.newBuilder();
data_2.setKey("http.method");
data_2.setValue("GET");
entryLogMessage.addData(data_2);
entrySpan.addLogs(entryLogMessage);
segmentBuilder.addSpans(entrySpan);
upstream.setSegment(segmentBuilder.build().toByteString());
return upstream.build();
}
private UniqueId createSegmentId() {
long id = sequence.getAndIncrement();
UniqueId.Builder builder = UniqueId.newBuilder();
builder.addIdParts(id);
builder.addIdParts(id);
builder.addIdParts(id);
return builder.build();
}
class Ids {
private int consumerApplicationId = 0;
private int providerApplicationId = 0;
private int consumerInstanceId = 0;
private int providerInstanceId = 0;
private int consumerEntryServiceId = 0;
private int consumerExitServiceId = 0;
private int consumerExitApplicationId = 0;
private int providerEntryServiceId = 0;
public int getConsumerApplicationId() {
return consumerApplicationId;
}
public void setConsumerApplicationId(int consumerApplicationId) {
this.consumerApplicationId = consumerApplicationId;
}
public int getProviderApplicationId() {
return providerApplicationId;
}
public void setProviderApplicationId(int providerApplicationId) {
this.providerApplicationId = providerApplicationId;
}
public int getConsumerInstanceId() {
return consumerInstanceId;
}
public void setConsumerInstanceId(int consumerInstanceId) {
this.consumerInstanceId = consumerInstanceId;
}
public int getProviderInstanceId() {
return providerInstanceId;
}
public void setProviderInstanceId(int providerInstanceId) {
this.providerInstanceId = providerInstanceId;
}
public int getConsumerEntryServiceId() {
return consumerEntryServiceId;
}
public void setConsumerEntryServiceId(int consumerEntryServiceId) {
this.consumerEntryServiceId = consumerEntryServiceId;
}
public int getConsumerExitServiceId() {
return consumerExitServiceId;
}
public void setConsumerExitServiceId(int consumerExitServiceId) {
this.consumerExitServiceId = consumerExitServiceId;
}
public int getConsumerExitApplicationId() {
return consumerExitApplicationId;
}
public void setConsumerExitApplicationId(int consumerExitApplicationId) {
this.consumerExitApplicationId = consumerExitApplicationId;
}
public int getProviderEntryServiceId() {
return providerEntryServiceId;
}
public void setProviderEntryServiceId(int providerEntryServiceId) {
this.providerEntryServiceId = providerEntryServiceId;
}
}
class ThreadCount {
private int count;
public ThreadCount(int count) {
this.count = count;
}
public void finishOne() {
count--;
}
public int getCount() {
return count;
}
}
class Status {
private boolean isFinish = false;
public boolean isFinish() {
return isFinish;
}
public void setFinish(boolean finish) {
isFinish = finish;
}
}
}
......@@ -40,8 +40,8 @@
"tv": 0,
"lv": 2,
"ps": -1,
"st": 1501858094883,
"et": 1501858096950,
"st": 1501858094726,
"et": 1501858096804,
"ci": 3,
"cn": "",
"oi": 0,
......
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
</layout>
</appender>
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="INFO"/>
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
\ No newline at end of file
......@@ -25,6 +25,6 @@ storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: 127.0.0.1:9300
cluster_nodes: 10.0.0.19:9300,10.0.0.6:9300
......@@ -4,12 +4,16 @@ import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServerHolder {
private final Logger logger = LoggerFactory.getLogger(ServerHolder.class);
private List<Server> servers;
public ServerHolder() {
......@@ -33,7 +37,10 @@ public class ServerHolder {
private void addHandler(List<Handler> handlers, Server server) {
if (CollectionUtils.isNotEmpty(handlers)) {
handlers.forEach(handler -> server.addHandler(handler));
handlers.forEach(handler -> {
server.addHandler(handler);
logger.debug("add handler into server: {}, handler name: {}", server.hostPort(), handler.getClass().getName());
});
}
}
......
......@@ -4,7 +4,7 @@ option java_multiple_files = true;
option java_package = "org.skywalking.apm.collector.remote.grpc.proto";
service RemoteCommonService {
rpc call (RemoteMessage) returns (Empty) {
rpc call (stream RemoteMessage) returns (Empty) {
}
}
......
......@@ -53,7 +53,12 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
return Settings.builder()
.put("index.number_of_shards", tableDefine.numberOfShards())
.put("index.number_of_replicas", tableDefine.numberOfReplicas())
.put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s").build();
.put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s")
.put("analysis.analyzer.collector_analyzer.tokenizer", "collector_tokenizer")
.put("analysis.tokenizer.collector_tokenizer.type", "standard")
.put("analysis.tokenizer.collector_tokenizer.max_token_length", 5)
.build();
}
private XContentBuilder createMappingBuilder(ElasticSearchTableDefine tableDefine) throws IOException {
......
......@@ -22,17 +22,29 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class);
@Override public void call(RemoteMessage request, StreamObserver<Empty> responseObserver) {
String roleName = request.getWorkerRole();
RemoteData remoteData = request.getRemoteData();
@Override public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
return new StreamObserver<RemoteMessage>() {
@Override public void onNext(RemoteMessage message) {
String roleName = message.getWorkerRole();
RemoteData remoteData = message.getRemoteData();
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
Role role = context.getClusterWorkerContext().getRole(roleName);
Object object = role.dataDefine().deserialize(remoteData);
try {
context.getClusterWorkerContext().lookupInSide(roleName).tell(object);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
Role role = context.getClusterWorkerContext().getRole(roleName);
Object object = role.dataDefine().deserialize(remoteData);
try {
context.getClusterWorkerContext().lookupInSide(roleName).tell(object);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
package org.skywalking.apm.collector.stream.worker;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class RemoteWorkerRef extends WorkerRef {
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
private final Boolean acrossJVM;
private final RemoteCommonServiceGrpc.RemoteCommonServiceBlockingStub stub;
private final RemoteCommonServiceGrpc.RemoteCommonServiceStub stub;
private StreamObserver<RemoteMessage> streamObserver;
private final AbstractRemoteWorker remoteWorker;
public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) {
......@@ -25,7 +32,8 @@ public class RemoteWorkerRef extends WorkerRef {
super(role);
this.remoteWorker = null;
this.acrossJVM = true;
this.stub = RemoteCommonServiceGrpc.newBlockingStub(client.getChannel());
this.stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
createStreamObserver();
}
@Override
......@@ -36,7 +44,8 @@ public class RemoteWorkerRef extends WorkerRef {
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(getRole().roleName());
builder.setRemoteData(remoteData);
stub.call(builder.build());
streamObserver.onNext(builder.build());
} else {
remoteWorker.allocateJob(message);
}
......@@ -45,4 +54,63 @@ public class RemoteWorkerRef extends WorkerRef {
public Boolean isAcrossJVM() {
return acrossJVM;
}
private void createStreamObserver() {
StreamStatus status = new StreamStatus(false);
streamObserver = stub.call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
status.finished();
}
});
}
class StreamStatus {
private volatile boolean status;
public StreamStatus(boolean status) {
this.status = status;
}
public boolean isFinish() {
return status;
}
public void finished() {
this.status = true;
}
/**
* @param maxTimeout max wait time, milliseconds.
*/
public void wait4Finish(long maxTimeout) {
long time = 0;
while (!status) {
if (time > maxTimeout) {
break;
}
try2Sleep(5);
time += 5;
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}
}
}
......@@ -51,7 +51,7 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
private void sendToNext() throws WorkerException {
dataCache.switchPointer();
while (dataCache.getLast().isHolding()) {
while (dataCache.getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
......@@ -66,17 +66,17 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
logger.error(e.getMessage(), e);
}
});
dataCache.releaseLast();
dataCache.finishReadingLast();
}
protected final void aggregate(Object message) {
Data data = (Data)message;
dataCache.hold();
dataCache.writing();
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
} else {
dataCache.put(data.id(), data);
}
dataCache.release();
dataCache.finishWriting();
}
}
package org.skywalking.apm.collector.stream.worker.impl;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.dao.IBatchDAO;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
......@@ -35,24 +37,37 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
}
@Override protected final void onWork(Object message) throws WorkerException {
if (message instanceof EndOfBatchCommand || message instanceof FlushAndSwitch) {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} else {
if (dataCache.currentCollectionSize() >= 1000) {
if (message instanceof FlushAndSwitch) {
try {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
}
} finally {
dataCache.trySwitchPointerFinally();
}
} else if (message instanceof EndOfBatchCommand) {
} else {
if (dataCache.currentCollectionSize() >= 5000) {
try {
if (dataCache.trySwitchPointer()) {
dataCache.switchPointer();
List<?> collection = buildBatchCollection();
IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName());
dao.batchPersistence(collection);
}
} finally {
dataCache.trySwitchPointerFinally();
}
}
aggregate(message);
}
}
public final List<?> buildBatchCollection() throws WorkerException {
List<?> batchCollection;
List<?> batchCollection = new LinkedList<>();
try {
while (dataCache.getLast().isHolding()) {
while (dataCache.getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
......@@ -60,16 +75,18 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
}
}
batchCollection = prepareBatch(dataCache.getLast().asMap());
if (dataCache.getLast().asMap() != null) {
batchCollection = prepareBatch(dataCache.getLast().asMap());
}
} finally {
dataCache.releaseLast();
dataCache.finishReadingLast();
}
return batchCollection;
}
protected final List<Object> prepareBatch(Map<String, Data> dataMap) {
List<Object> insertBatchCollection = new ArrayList<>();
List<Object> updateBatchCollection = new ArrayList<>();
List<Object> insertBatchCollection = new LinkedList<>();
List<Object> updateBatchCollection = new LinkedList<>();
dataMap.forEach((id, data) -> {
if (needMergeDBData()) {
Data dbData = persistenceDAO().get(id, getRole().dataDefine());
......@@ -101,18 +118,16 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker {
}
private void aggregate(Object message) {
dataCache.hold();
dataCache.writing();
Data data = (Data)message;
if (dataCache.containsKey(data.id())) {
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
} else {
if (dataCache.currentCollectionSize() < 1000) {
dataCache.put(data.id(), data);
}
dataCache.put(data.id(), data);
}
dataCache.release();
dataCache.finishWriting();
}
protected abstract IPersistenceDAO persistenceDAO();
......
......@@ -21,16 +21,16 @@ public class DataCache extends Window {
lockedDataCollection.put(id, data);
}
public void hold() {
lockedDataCollection = getCurrentAndHold();
public void writing() {
lockedDataCollection = getCurrentAndWriting();
}
public int currentCollectionSize() {
return getCurrentAndHold().size();
return getCurrent().size();
}
public void release() {
lockedDataCollection.release();
public void finishWriting() {
lockedDataCollection.finishWriting();
lockedDataCollection = null;
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.collector.core.stream.Data;
/**
......@@ -9,23 +9,37 @@ import org.skywalking.apm.collector.core.stream.Data;
*/
public class DataCollection {
private Map<String, Data> data;
private volatile boolean isHold;
private volatile boolean writing;
private volatile boolean reading;
public DataCollection() {
this.data = new HashMap<>();
this.isHold = false;
this.data = new ConcurrentHashMap<>();
this.writing = false;
this.reading = false;
}
public void release() {
isHold = false;
public void finishWriting() {
writing = false;
}
public void hold() {
isHold = true;
public void writing() {
writing = true;
}
public boolean isHolding() {
return isHold;
public boolean isWriting() {
return writing;
}
public void finishReading() {
reading = false;
}
public void reading() {
reading = true;
}
public boolean isReading() {
return reading;
}
public boolean containsKey(String key) {
......
......@@ -21,12 +21,11 @@ public abstract class Window {
}
public boolean trySwitchPointer() {
if (windowSwitch.incrementAndGet() == 1) {
return true;
} else {
windowSwitch.addAndGet(-1);
return false;
}
return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
}
public void trySwitchPointerFinally() {
windowSwitch.addAndGet(-1);
}
public void switchPointer() {
......@@ -35,18 +34,23 @@ public abstract class Window {
} else {
pointer = windowDataA;
}
getLast().reading();
}
protected DataCollection getCurrentAndHold() {
protected DataCollection getCurrentAndWriting() {
if (pointer == windowDataA) {
windowDataA.hold();
windowDataA.writing();
return windowDataA;
} else {
windowDataB.hold();
windowDataB.writing();
return windowDataB;
}
}
protected DataCollection getCurrent() {
return pointer;
}
public DataCollection getLast() {
if (pointer == windowDataA) {
return windowDataB;
......@@ -55,8 +59,8 @@ public abstract class Window {
}
}
public void releaseLast() {
public void finishReadingLast() {
getLast().clear();
windowSwitch.addAndGet(-1);
getLast().finishReading();
}
}
......@@ -16,7 +16,6 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.network.proto.GCPhrase;
......@@ -92,8 +91,8 @@ public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO {
MultiGetRequestBuilder youngPrepareMultiGet = getClient().prepareMultiGet();
int i = 0;
do {
String youngId = (startTimeBucket + i) + Const.ID_SPLIT + GCPhrase.NEW_VALUE + instanceId;
youngPrepareMultiGet.add(CpuMetricTable.TABLE, CpuMetricTable.TABLE_TYPE, youngId);
String youngId = (startTimeBucket + i) + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + GCPhrase.NEW_VALUE;
youngPrepareMultiGet.add(GCMetricTable.TABLE, GCMetricTable.TABLE_TYPE, youngId);
i++;
}
while (startTimeBucket + i <= endTimeBucket);
......@@ -102,7 +101,7 @@ public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO {
MultiGetResponse multiGetResponse = youngPrepareMultiGet.get();
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
if (itemResponse.getResponse().isExists()) {
youngArray.add(((Number)itemResponse.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).intValue());
youngArray.add(((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).intValue());
} else {
youngArray.add(0);
}
......@@ -112,8 +111,8 @@ public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO {
MultiGetRequestBuilder oldPrepareMultiGet = getClient().prepareMultiGet();
i = 0;
do {
String oldId = (startTimeBucket + i) + Const.ID_SPLIT + GCPhrase.OLD_VALUE + instanceId;
oldPrepareMultiGet.add(CpuMetricTable.TABLE, CpuMetricTable.TABLE_TYPE, oldId);
String oldId = (startTimeBucket + i) + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + GCPhrase.OLD_VALUE;
oldPrepareMultiGet.add(GCMetricTable.TABLE, GCMetricTable.TABLE_TYPE, oldId);
i++;
}
while (startTimeBucket + i <= endTimeBucket);
......@@ -123,7 +122,7 @@ public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO {
multiGetResponse = oldPrepareMultiGet.get();
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
if (itemResponse.getResponse().isExists()) {
oldArray.add(((Number)itemResponse.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).intValue());
oldArray.add(((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).intValue());
} else {
oldArray.add(0);
}
......
......@@ -9,9 +9,13 @@ import java.util.List;
public interface IInstPerformanceDAO {
List<InstPerformance> getMultiple(long timeBucket, int applicationId);
int getMetric(int instanceId, long timeBucket);
int getTpsMetric(int instanceId, long timeBucket);
JsonArray getMetric(int instanceId, long startTimeBucket, long endTimeBucket);
JsonArray getTpsMetric(int instanceId, long startTimeBucket, long endTimeBucket);
int getRespTimeMetric(int instanceId, long timeBucket);
JsonArray getRespTimeMetric(int instanceId, long startTimeBucket, long endTimeBucket);
class InstPerformance {
private final int instanceId;
......
......@@ -7,5 +7,9 @@ import com.google.gson.JsonObject;
*/
public interface ISegmentCostDAO {
JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from);
String globalTraceId, int limit, int from, Sort sort);
public enum Sort {
Cost, Time
}
}
......@@ -18,7 +18,6 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
/**
......@@ -68,7 +67,7 @@ public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO {
return instPerformances;
}
@Override public int getMetric(int instanceId, long timeBucket) {
@Override public int getTpsMetric(int instanceId, long timeBucket) {
String id = timeBucket + Const.ID_SPLIT + instanceId;
GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get();
......@@ -78,13 +77,13 @@ public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO {
return 0;
}
@Override public JsonArray getMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
@Override public JsonArray getTpsMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
int i = 0;
do {
String id = (startTimeBucket + i) + Const.ID_SPLIT + instanceId;
prepareMultiGet.add(CpuMetricTable.TABLE, InstPerformanceTable.TABLE_TYPE, id);
prepareMultiGet.add(InstPerformanceTable.TABLE, InstPerformanceTable.TABLE_TYPE, id);
i++;
}
while (startTimeBucket + i <= endTimeBucket);
......@@ -100,4 +99,41 @@ public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO {
}
return metrics;
}
@Override public int getRespTimeMetric(int instanceId, long timeBucket) {
String id = timeBucket + Const.ID_SPLIT + instanceId;
GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get();
if (getResponse.isExists()) {
int callTimes = ((Number)getResponse.getSource().get(InstPerformanceTable.COLUMN_CALL_TIMES)).intValue();
int costTotal = ((Number)getResponse.getSource().get(InstPerformanceTable.COLUMN_COST_TOTAL)).intValue();
return costTotal / callTimes;
}
return 0;
}
@Override public JsonArray getRespTimeMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
int i = 0;
do {
String id = (startTimeBucket + i) + Const.ID_SPLIT + instanceId;
prepareMultiGet.add(InstPerformanceTable.TABLE, InstPerformanceTable.TABLE_TYPE, id);
i++;
}
while (startTimeBucket + i <= endTimeBucket);
JsonArray metrics = new JsonArray();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
int callTimes = ((Number)response.getResponse().getSource().get(InstPerformanceTable.COLUMN_CALL_TIMES)).intValue();
int costTotal = ((Number)response.getResponse().getSource().get(InstPerformanceTable.COLUMN_COST_TOTAL)).intValue();
metrics.add(costTotal / callTimes);
} else {
metrics.add(0);
}
}
return metrics;
}
}
......@@ -13,11 +13,19 @@ public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO {
return null;
}
@Override public int getMetric(int instanceId, long timeBucket) {
@Override public int getTpsMetric(int instanceId, long timeBucket) {
return 0;
}
@Override public JsonArray getMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
@Override public JsonArray getTpsMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
return null;
}
@Override public int getRespTimeMetric(int instanceId, long timeBucket) {
return 0;
}
@Override public JsonArray getRespTimeMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
return null;
}
}
......@@ -44,9 +44,7 @@ public class MemoryMetricEsDAO extends EsDAO implements IMemoryMetricDAO {
while (startTimeBucket + i <= endTimeBucket);
JsonObject metric = new JsonObject();
JsonArray usedMetric = new JsonArray();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
......@@ -57,6 +55,7 @@ public class MemoryMetricEsDAO extends EsDAO implements IMemoryMetricDAO {
usedMetric.add(0);
}
}
metric.add("used", usedMetric);
return metric;
}
}
......@@ -45,9 +45,7 @@ public class MemoryPoolMetricEsDAO extends EsDAO implements IMemoryPoolMetricDAO
while (startTimeBucket + i <= endTimeBucket);
JsonObject metric = new JsonObject();
JsonArray usedMetric = new JsonArray();
MultiGetResponse multiGetResponse = prepareMultiGet.get();
for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
if (response.getResponse().isExists()) {
......@@ -58,6 +56,7 @@ public class MemoryPoolMetricEsDAO extends EsDAO implements IMemoryPoolMetricDAO
usedMetric.add(0);
}
}
metric.add("used", usedMetric);
return metric;
}
}
......@@ -15,9 +15,9 @@ import org.elasticsearch.search.sort.SortOrder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.define.global.GlobalTraceTable;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
/**
* @author pengys5
......@@ -25,7 +25,7 @@ import org.skywalking.apm.collector.storage.define.segment.SegmentCostTable;
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
@Override public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from) {
String globalTraceId, int limit, int from, Sort sort) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(SegmentCostTable.TABLE);
searchRequestBuilder.setTypes(SegmentCostTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
......@@ -48,7 +48,11 @@ public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO {
mustQueryList.add(QueryBuilders.matchQuery(SegmentCostTable.COLUMN_SERVICE_NAME, operationName));
}
searchRequestBuilder.addSort(SegmentCostTable.COLUMN_COST, SortOrder.DESC);
if (Sort.Cost.equals(sort)) {
searchRequestBuilder.addSort(SegmentCostTable.COLUMN_COST, SortOrder.DESC);
} else if (Sort.Time.equals(sort)) {
searchRequestBuilder.addSort(SegmentCostTable.COLUMN_START_TIME, SortOrder.DESC);
}
searchRequestBuilder.setSize(limit);
searchRequestBuilder.setFrom(from);
......
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
*/
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO {
@Override public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from) {
String globalTraceId, int limit, int from, Sort sort) {
return null;
}
}
......@@ -37,7 +37,7 @@ public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO {
boolQueryBuilder.must().add(QueryBuilders.matchQuery(ServiceEntryTable.COLUMN_APPLICATION_ID, applicationId));
}
if (StringUtils.isNotEmpty(entryServiceName)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, entryServiceName));
boolQueryBuilder.must().add(QueryBuilders.matchQuery(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, entryServiceName));
}
searchRequestBuilder.setQuery(boolQueryBuilder);
......
......@@ -17,6 +17,9 @@ import org.skywalking.apm.collector.ui.jetty.handler.TraceStackGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.UIJettyServerHandler;
import org.skywalking.apm.collector.ui.jetty.handler.application.ApplicationsGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.instancehealth.InstanceHealthGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.instancemetric.InstanceMetricGetOneTimeBucketHandler;
import org.skywalking.apm.collector.ui.jetty.handler.instancemetric.InstanceMetricGetRangeTimeBucketHandler;
import org.skywalking.apm.collector.ui.jetty.handler.instancemetric.InstanceOsInfoGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.servicetree.EntryServiceGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.servicetree.ServiceTreeGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.time.AllInstanceLastTimeGetHandler;
......@@ -64,6 +67,9 @@ public class UIJettyModuleDefine extends UIModuleDefine {
handlers.add(new AllInstanceLastTimeGetHandler());
handlers.add(new InstanceHealthGetHandler());
handlers.add(new ApplicationsGetHandler());
handlers.add(new InstanceOsInfoGetHandler());
handlers.add(new InstanceMetricGetOneTimeBucketHandler());
handlers.add(new InstanceMetricGetRangeTimeBucketHandler());
handlers.add(new EntryServiceGetHandler());
handlers.add(new ServiceTreeGetHandler());
return handlers;
......
......@@ -4,6 +4,7 @@ import com.google.gson.JsonElement;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.skywalking.apm.collector.ui.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.ui.service.SegmentTopService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -77,7 +78,15 @@ public class SegmentTopGetHandler extends JettyHandler {
operationName = req.getParameter("operationName");
}
return service.loadTop(startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from);
ISegmentCostDAO.Sort sort = ISegmentCostDAO.Sort.Cost;
if (req.getParameterMap().containsKey("sort")) {
String sortStr = req.getParameter("sort");
if (sortStr.toLowerCase().equals(ISegmentCostDAO.Sort.Time.name().toLowerCase())) {
sort = ISegmentCostDAO.Sort.Time;
}
}
return service.loadTop(startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from, sort);
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
......
......@@ -24,13 +24,13 @@ public class InstanceHealthGetHandler extends JettyHandler {
private InstanceHealthService service = new InstanceHealthService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
String timestampStr = req.getParameter("timestamp");
String timeBucketStr = req.getParameter("timeBucket");
String[] applicationIdsStr = req.getParameterValues("applicationIds");
logger.debug("instance health get timestamp: {}, applicationIdsStr: {}", timestampStr, applicationIdsStr);
logger.debug("instance health get timeBucket: {}, applicationIdsStr: {}", timeBucketStr, applicationIdsStr);
long timestamp;
long timeBucket;
try {
timestamp = Long.parseLong(timestampStr);
timeBucket = Long.parseLong(timeBucketStr);
} catch (NumberFormatException e) {
throw new ArgumentsParseException("timestamp must be long");
}
......@@ -45,12 +45,12 @@ public class InstanceHealthGetHandler extends JettyHandler {
}
JsonObject response = new JsonObject();
response.addProperty("timestamp", timestamp);
response.addProperty("timeBucket", timeBucket);
JsonArray appInstances = new JsonArray();
response.add("appInstances", appInstances);
for (int applicationId : applicationIds) {
appInstances.add(service.getInstances(timestamp, applicationId));
appInstances.add(service.getInstances(timeBucket, applicationId));
}
return response;
}
......
......@@ -23,18 +23,18 @@ public class OneInstanceLastTimeGetHandler extends JettyHandler {
private TimeSynchronousService service = new TimeSynchronousService();
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
String applicationInstanceIdStr = req.getParameter("applicationInstanceId");
logger.debug("applicationInstanceId: {}", applicationInstanceIdStr);
String instanceIdStr = req.getParameter("instanceId");
logger.debug("instanceId: {}", instanceIdStr);
int applicationInstanceId;
int instanceId;
try {
applicationInstanceId = Integer.parseInt(applicationInstanceIdStr);
instanceId = Integer.parseInt(instanceIdStr);
} catch (NumberFormatException e) {
throw new ArgumentsParseException("application instance id must be integer");
}
Long time = service.instanceLastTime(applicationInstanceId);
logger.debug("application instance id: {}, instance last time: {}", applicationInstanceId, time);
Long time = service.instanceLastTime(instanceId);
logger.debug("application instance id: {}, instance last time: {}", instanceId, time);
JsonObject timeJson = new JsonObject();
timeJson.addProperty("timeBucket", time);
return timeJson;
......
......@@ -18,11 +18,10 @@ public class InstanceHealthService {
private final Logger logger = LoggerFactory.getLogger(InstanceHealthService.class);
public JsonObject getInstances(long timestamp, int applicationId) {
public JsonObject getInstances(long timeBucket, int applicationId) {
JsonObject response = new JsonObject();
long secondTimeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(timestamp);
long s5TimeBucket = TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(secondTimeBucket);
long s5TimeBucket = TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(timeBucket);
IInstPerformanceDAO instPerformanceDAO = (IInstPerformanceDAO)DAOContainer.INSTANCE.get(IInstPerformanceDAO.class.getName());
List<IInstPerformanceDAO.InstPerformance> performances = instPerformanceDAO.getMultiple(s5TimeBucket, applicationId);
......
......@@ -14,9 +14,9 @@ public class SegmentTopService {
private final Logger logger = LoggerFactory.getLogger(SegmentTopService.class);
public JsonObject loadTop(long startTime, long endTime, long minCost, long maxCost, String operationName,
String globalTraceId, int limit, int from) {
String globalTraceId, int limit, int from, ISegmentCostDAO.Sort sort) {
logger.debug("startTime: {}, endTime: {}, minCost: {}, maxCost: {}, operationName: {}, globalTraceId: {}, limit: {}, from: {}", startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from);
ISegmentCostDAO segmentCostDAO = (ISegmentCostDAO)DAOContainer.INSTANCE.get(ISegmentCostDAO.class.getName());
return segmentCostDAO.loadTop(startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from);
return segmentCostDAO.loadTop(startTime, endTime, minCost, maxCost, operationName, globalTraceId, limit, from, sort);
}
}
......@@ -8,6 +8,9 @@ org.skywalking.apm.collector.ui.dao.ApplicationEsDAO
org.skywalking.apm.collector.ui.dao.ServiceNameEsDAO
org.skywalking.apm.collector.ui.dao.InstanceEsDAO
org.skywalking.apm.collector.ui.dao.InstPerformanceEsDAO
org.skywalking.apm.collector.ui.dao.CpuMetricEsDAO
org.skywalking.apm.collector.ui.dao.GCMetricEsDAO
org.skywalking.apm.collector.ui.dao.MemoryMetricEsDAO
org.skywalking.apm.collector.ui.dao.MemoryPoolMetricEsDAO
org.skywalking.apm.collector.ui.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.ui.dao.ServiceReferenceEsDAO
\ No newline at end of file
......@@ -23,8 +23,12 @@ public class JDBCDriverInterceptor implements InstanceMethodsAroundInterceptor {
@Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
return new SWConnection((String)allArguments[0],
(Properties)allArguments[1], (Connection)ret);
if (ret != null) {
return new SWConnection((String)allArguments[0],
(Properties)allArguments[1], (Connection)ret);
}
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册