提交 c1c39465 编写于 作者: G Gao Hongtao 提交者: GitHub

Merge branch 'master' into master

......@@ -12,7 +12,7 @@ Sky Walking | [English](README.md)
* 自动java探针,**不需要修改应用程序源代码**
* 高性能探针,针对单实例5000tps的应用,在**不需要采样的情况下**,只增加**10%**的CPU开销。
 * 高性能探针,针对单实例5000tps的应用,在**全量采集的情况下**,只增加**10%**的CPU开销。
* [中间件,框架与类库支持列表](https://github.com/wu-sheng/sky-walking/wiki/3.2-supported-list).
* 手动探针
* [使用OpenTracing手动探针API](http://opentracing.io/documentation/pages/supported-tracers)
......
......@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener, FirstSpanListener {
public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener,FirstSpanListener {
private final Logger logger = LoggerFactory.getLogger(SegmentCostSpanListener.class);
......@@ -50,10 +50,7 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanObject.getStartTime());
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
SegmentCostDataDefine.SegmentCost segmentCost = new SegmentCostDataDefine.SegmentCost();
segmentCost.setSegmentId(segmentId);
segmentCost.setApplicationId(applicationId);
......@@ -71,6 +68,11 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
isError = isError || spanObject.getIsError();
}
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanObject.getIsError();
}
@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
isError = isError || spanObject.getIsError();
......@@ -96,4 +98,4 @@ public class SegmentCostSpanListener implements EntrySpanListener, ExitSpanListe
}
}
}
}
\ No newline at end of file
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.agent.core.boot;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author zhangkewei
*/
public class DefaultNamedThreadFactory implements ThreadFactory {
private static final AtomicInteger BOOT_SERVICE_SEQ = new AtomicInteger(0);
private final AtomicInteger threadSeq = new AtomicInteger(0);
private final String namePrefix;
public DefaultNamedThreadFactory(String name) {
namePrefix = "SkywalkingAgent-" + BOOT_SERVICE_SEQ.incrementAndGet() + "-" + name + "-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r,namePrefix + threadSeq.getAndIncrement());
t.setDaemon(true);
return t;
}
}
......@@ -39,7 +39,7 @@ public class NoopSpan implements AbstractSpan {
}
@Override public AbstractSpan errorOccurred() {
return null;
return this;
}
public void finish() {
......
......@@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
......@@ -57,7 +58,6 @@ public class JVMService implements BootService, Runnable {
private volatile ScheduledFuture<?> collectMetricFuture;
private volatile ScheduledFuture<?> sendMetricFuture;
private Sender sender;
@Override
public void beforeBoot() throws Throwable {
queue = new LinkedBlockingQueue(Config.Jvm.BUFFER_SIZE);
......@@ -68,10 +68,10 @@ public class JVMService implements BootService, Runnable {
@Override
public void boot() throws Throwable {
collectMetricFuture = Executors
.newSingleThreadScheduledExecutor()
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))
.scheduleAtFixedRate(this, 0, 1, TimeUnit.SECONDS);
sendMetricFuture = Executors
.newSingleThreadScheduledExecutor()
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))
.scheduleAtFixedRate(sender, 0, 1, TimeUnit.SECONDS);
}
......
......@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.skywalking.apm.agent.core.boot.ServiceManager;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
......@@ -89,7 +90,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
@Override
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor()
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(this, 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
......@@ -105,6 +106,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
@Override
public void run() {
logger.debug("AppAndServiceRegisterClient running, status:{}.",status);
boolean shouldTry = true;
while (CONNECTED.equals(status) && shouldTry) {
shouldTry = false;
......
......@@ -22,6 +22,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.skywalking.apm.agent.core.conf.Config;
/**
......@@ -39,7 +40,7 @@ public class CollectorDiscoveryService implements BootService {
@Override
public void boot() throws Throwable {
future = Executors.newSingleThreadScheduledExecutor()
future = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("CollectorDiscoveryService"))
.scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
}
......
......@@ -32,12 +32,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.skywalking.apm.logging.ILog;
import org.skywalking.apm.logging.LogManager;
import static org.skywalking.apm.agent.core.conf.Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL;
/**
* @author wusheng
*/
......@@ -58,8 +58,8 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(this, 0, GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(this, 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
......@@ -71,15 +71,18 @@ public class GRPCChannelManager implements BootService, Runnable {
public void shutdown() throws Throwable {
connectCheckFuture.cancel(true);
managedChannel.shutdownNow();
logger.debug("Selected collector grpc service shutdown.");
}
@Override
public void run() {
logger.debug("Selected collector grpc service running, reconnect:{}.",reconnect);
if (reconnect) {
if (RemoteDownstreamConfig.Collector.GRPC_SERVERS.size() > 0) {
int index = random.nextInt() % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
String server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
String server = "";
try {
int index = Math.abs(random.nextInt()) % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
String[] ipAndPort = server.split(":");
ManagedChannelBuilder<?> channelBuilder =
NettyChannelBuilder.forAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
......@@ -100,7 +103,7 @@ public class GRPCChannelManager implements BootService, Runnable {
}
}
logger.debug("Selected collector grpc service is not available. Wait {} seconds to retry", GRPC_CHANNEL_CHECK_INTERVAL);
logger.debug("Selected collector grpc service is not available. Wait {} seconds to retry", Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL);
}
}
......
......@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.skywalking.apm.agent.core.conf.Config;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.logging.ILog;
......@@ -63,7 +64,7 @@ public class SamplingService implements BootService {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor();
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
scheduledFuture = service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.agent.core.boot;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
/**
* @author zhangkewei
*/
public class DefaultNamedThreadFactoryTest {
@Test
public void testNamedThread() throws Exception {
Thread newThread = new DefaultNamedThreadFactory("DefaultNamedThreadFactoryTest").newThread(new Runnable() {
@Override
public void run() {
}
});
newThread.start();
assertNotNull(newThread.getName());
assert(newThread.getName().contains("DefaultNamedThreadFactoryTest"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册