提交 82ccb8dc 编写于 作者: wu-sheng's avatar wu-sheng

Rename ServiceStarter to ServiceManager.

Finish the major codes of TraceSegmentProcessQueue and CollectorClientService.
上级 413f207e
package com.a.eye.skywalking.agent;
import com.a.eye.skywalking.agent.junction.SkyWalkingEnhanceMatcher;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.conf.SnifferConfigInitializer;
import com.a.eye.skywalking.api.logging.EasyLogResolver;
......@@ -54,7 +54,7 @@ public class SkyWalkingAgent {
final PluginFinder pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
new AgentBuilder.Default().type(enhanceClassMatcher(pluginFinder).and(not(isInterface()))).transform(new AgentBuilder.Transformer() {
public DynamicType.Builder<?> transform(DynamicType.Builder<?> builder, TypeDescription typeDescription, ClassLoader classLoader) {
......
......@@ -8,12 +8,12 @@ import java.util.Map;
import java.util.ServiceLoader;
/**
* The <code>ServiceStarter</code> bases on {@link ServiceLoader},
* The <code>ServiceManager</code> bases on {@link ServiceLoader},
* load all {@link BootService} implementations.
*
* @author wusheng
*/
public enum ServiceStarter {
public enum ServiceManager {
INSTANCE;
private static ILog logger = LogManager.getLogger(StatusBootService.class);
......@@ -31,7 +31,7 @@ public enum ServiceStarter {
bootService.bootUp();
bootedServices.put(bootService.getClass(), bootService);
} catch (Exception e) {
logger.error(e, "ServiceStarter try to start [{}] fail.", bootService.getClass().getName());
logger.error(e, "ServiceManager try to start [{}] fail.", bootService.getClass().getName());
}
}
} finally {
......
package com.a.eye.skywalking.api.client;
import com.a.eye.skywalking.api.boot.BootService;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment;
import java.util.List;
/**
* @author wusheng
*/
public class CollectorClientService implements BootService {
public class CollectorClientService implements Runnable, BootService {
private static ILog logger = LogManager.getLogger(CollectorClientService.class);
private static long NO_DATA_SLEEP_TIME_MILLIS = 500;
/**
* Start a new {@link Thread} to get finished {@link TraceSegment} by {@link TraceSegmentProcessQueue#getCachedTraceSegments()}
*/
@Override
public void bootUp() {
Thread collectorClientThread = new Thread(this, "collectorClientThread");
collectorClientThread.start();
}
@Override
public void run() {
while (true) {
try {
TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class);
List<TraceSegment> cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments();
if (cachedTraceSegments.size() > 0) {
for (TraceSegment segment : cachedTraceSegments) {
//TODO: wusheng
//send data
}
/**
* No sleep, when exist finished {@link TraceSegment}.
*/
} else {
try2Sleep(NO_DATA_SLEEP_TIME_MILLIS);
}
} catch (Throwable t) {
logger.error(t, "Send trace segments to collector failure.");
}
}
}
/**
* Try to sleep, and ignore the {@link InterruptedException}
*
* @param millis the length of time to sleep in milliseconds
*/
private void try2Sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
}
}
......@@ -16,7 +16,7 @@ public class Config {
}
public static class Disruptor{
public static int BUFFER_SIZE = 1024 * 4;
public static int BUFFER_SIZE = 512;
}
......
......@@ -19,6 +19,10 @@ public final class TraceSegmentHolder {
this.value = value;
}
public void clear(){
this.value = null;
}
public enum Factory implements EventFactory<TraceSegmentHolder> {
INSTANCE;
......
......@@ -4,10 +4,15 @@ import com.a.eye.skywalking.api.boot.StatusBootService;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.api.context.TracerContextListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.LinkedList;
import java.util.List;
/**
* {@link TraceSegmentProcessQueue} is a proxy of {@link Disruptor}, High Performance Inter-Thread MQ.
......@@ -16,12 +21,19 @@ import com.lmax.disruptor.util.DaemonThreadFactory;
*
* Created by wusheng on 2017/2/17.
*/
public class TraceSegmentProcessQueue extends StatusBootService implements TracerContextListener {
public class TraceSegmentProcessQueue extends StatusBootService implements TracerContextListener, EventHandler<TraceSegmentHolder> {
private static ILog logger = LogManager.getLogger(TraceSegmentProcessQueue.class);
private Disruptor<TraceSegmentHolder> disruptor;
private RingBuffer<TraceSegmentHolder> buffer;
private TraceSegment[] secondLevelCache;
private volatile int cacheIndex;
public TraceSegmentProcessQueue() {
disruptor = new Disruptor<>(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
secondLevelCache = new TraceSegment[Config.Disruptor.BUFFER_SIZE];
cacheIndex = 0;
disruptor.handleEventsWith(this);
buffer = disruptor.getRingBuffer();
}
......@@ -33,7 +45,7 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
@Override
public void afterFinished(TraceSegment traceSegment) {
if(isStarted()) {
if (isStarted()) {
long sequence = this.buffer.next(); // Grab the next sequence
try {
TraceSegmentHolder data = this.buffer.get(sequence);
......@@ -43,4 +55,42 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace
}
}
}
@Override
public void onEvent(TraceSegmentHolder event, long sequence, boolean endOfBatch) throws Exception {
TraceSegment traceSegment = event.getValue();
try {
if (secondLevelCache[cacheIndex] == null) {
secondLevelCache[cacheIndex] = traceSegment;
}else{
/**
* If your application has very high throughput(also called tps/qps),
* this log message will be output in very high frequency.
* And this is not suppose to happen. Disable log.warn or expend {@link Config.Disruptor.BUFFER_SIZE}
*/
logger.warn("TraceSegmentProcessQueue has data conflict. Discard the new TraceSegment.");
}
/**
* increase the {@link #cacheIndex}, if it is out of range, reset it.
*/
cacheIndex++;
if (cacheIndex == secondLevelCache.length) {
cacheIndex = 0;
}
} finally {
event.clear();
}
}
public List<TraceSegment> getCachedTraceSegments(){
List<TraceSegment> segmentList = new LinkedList<>();
for (int i = 0; i < secondLevelCache.length; i++) {
TraceSegment segment = secondLevelCache[i];
if(segment != null){
segmentList.add(segment);
secondLevelCache[i] = null;
}
}
return segmentList;
}
}
com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue
com.a.eye.skywalking.api.context.ContextManager
com.a.eye.skywalking.api.client.CollectorClientService
package com.a.eye.skywalking.plugin.dubbo;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.api.plugin.interceptor.EnhancedClassInstanceContext;
import com.a.eye.skywalking.api.plugin.interceptor.enhance.InstanceMethodInvokeContext;
......@@ -60,7 +60,7 @@ public class DubboInterceptorTest {
@Before
public void setUp() throws Exception {
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
dubboInterceptor = new DubboInterceptor();
testParam = new RequestParamForTestBelow283();
......
package com.a.eye.skywalking.plugin.httpClient.v4;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.api.plugin.interceptor.EnhancedClassInstanceContext;
import com.a.eye.skywalking.api.plugin.interceptor.enhance.InstanceMethodInvokeContext;
......@@ -58,7 +58,7 @@ public class HttpClientExecuteInterceptorTest {
public void setUp() throws Exception {
mockTracerContextListener = new MockTracerContextListener();
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
httpClientExecuteInterceptor = new HttpClientExecuteInterceptor();
PowerMockito.mock(HttpHost.class);
......
package com.a.eye.skywalking.plugin.jdbc;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.context.SegmentAssert;
import com.a.eye.skywalking.trace.LogData;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.mysql.cj.api.jdbc.JdbcConnection;
import org.hamcrest.CoreMatchers;
......@@ -90,7 +88,7 @@ public class SWCallableStatementTest extends AbstractStatementTest {
@Before
public void setUp() throws Exception {
mockTracerContextListener = new MockTracerContextListener();
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
swConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306/test", new Properties(), jdbcConnection);
multiHostConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306,127.0.0.1:3309/test", new Properties(), jdbcConnection);
......
package com.a.eye.skywalking.plugin.jdbc;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.context.SegmentAssert;
......@@ -47,7 +47,7 @@ public class SWConnectionTest extends AbstractStatementTest {
@Before
public void setUp() throws Exception {
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
mockTracerContextListener = new MockTracerContextListener();
swConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306/test", new Properties(), jdbcConnection);
multiHostConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306,127.0.0.1:3309/test", new Properties(), jdbcConnection);
......
package com.a.eye.skywalking.plugin.jdbc;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.context.SegmentAssert;
import com.a.eye.skywalking.trace.LogData;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.mysql.cj.api.jdbc.JdbcConnection;
import org.hamcrest.CoreMatchers;
......@@ -50,7 +48,7 @@ public class SWStatementTest extends AbstractStatementTest {
public void setUp() throws Exception {
mockTracerContextListener = new MockTracerContextListener();
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
swConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306/test", new Properties(), jdbcConnection);
multiHostConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306,127.0.0.1:3309/test", new Properties(), jdbcConnection);
......
package com.a.eye.skywalking.plugin.jdbc;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.context.SegmentAssert;
import com.a.eye.skywalking.trace.LogData;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.mysql.cj.api.jdbc.JdbcConnection;
import org.hamcrest.CoreMatchers;
......@@ -91,7 +89,7 @@ public class SwPreparedStatementTest extends AbstractStatementTest {
public void setUp() throws Exception {
mockTracerContextListener = new MockTracerContextListener();
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
swConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306/test", new Properties(), jdbcConnection);
multiHostConnection = new SWConnection("jdbc:mysql://127.0.0.1:3306,127.0.0.1:3309/test", new Properties(), jdbcConnection);
......
package com.a.eye.skywalking.plugin.jedis.v2;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.api.plugin.interceptor.EnhancedClassInstanceContext;
import com.a.eye.skywalking.api.plugin.interceptor.enhance.InstanceMethodInvokeContext;
......@@ -20,8 +20,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.sql.SQLException;
import static com.a.eye.skywalking.plugin.jedis.v2.JedisMethodInterceptor.KEY_OF_REDIS_HOST;
import static com.a.eye.skywalking.plugin.jedis.v2.JedisMethodInterceptor.KEY_OF_REDIS_HOSTS;
import static com.a.eye.skywalking.plugin.jedis.v2.JedisMethodInterceptor.KEY_OF_REDIS_PORT;
......@@ -30,7 +28,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
......@@ -47,7 +44,7 @@ public class JedisMethodInterceptorTest {
@Before
public void setUp() throws Exception {
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
interceptor = new JedisMethodInterceptor();
mockTracerContextListener = new MockTracerContextListener();
......
package com.a.eye.skywalking.plugin.motan;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.api.plugin.interceptor.EnhancedClassInstanceContext;
import com.a.eye.skywalking.api.plugin.interceptor.enhance.InstanceMethodInvokeContext;
import com.a.eye.skywalking.plugin.motan.define.MotanConsumerInstrumentation;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.context.SegmentAssert;
import com.a.eye.skywalking.trace.LogData;
......@@ -52,7 +51,7 @@ public class MotanConsumerInterceptorTest {
@Before
public void setUp() {
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
contextListener = new MockTracerContextListener();
invokeInterceptor = new MotanConsumerInterceptor();
......
package com.a.eye.skywalking.plugin.tomcat78x;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.api.plugin.interceptor.EnhancedClassInstanceContext;
import com.a.eye.skywalking.api.plugin.interceptor.enhance.InstanceMethodInvokeContext;
......@@ -50,7 +50,7 @@ public class TomcatInterceptorTest {
@Before
public void setUp() throws Exception {
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
tomcatInterceptor = new TomcatInterceptor();
contextListener = new MockTracerContextListener();
......
package com.a.eye.skywalking.sniffer.mock;
import com.a.eye.skywalking.api.boot.ServiceStarter;
import com.a.eye.skywalking.api.boot.ServiceManager;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.context.SegmentAssert;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
......@@ -15,7 +15,7 @@ import org.junit.Test;
public class MockTracerContextListenerTestCase {
@BeforeClass
public static void setup(){
ServiceStarter.INSTANCE.boot();
ServiceManager.INSTANCE.boot();
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册