From baf9bf694235cf1e7de2d642ee3dc1f2fa91ff7f Mon Sep 17 00:00:00 2001 From: wusheng Date: Mon, 20 Mar 2017 17:32:15 +0800 Subject: [PATCH] Finish CollectorClient code. It is almost ready to send TraceSegement (s) to collector in batch/fail-switch mode. --- .../eye/skywalking/agent/SkyWalkingAgent.java | 4 +- .../api/client/CollectorClient.java | 91 ++++++++++++++++++- .../api/client/RESTResponseStatusError.java | 13 +++ .../com/a/eye/skywalking/api/conf/Config.java | 12 ++- .../api/conf/SnifferConfigInitializer.java | 12 +-- .../skywalking/api/context/TracerContext.java | 4 +- .../api/logging/SyncFileWriter.java | 2 +- .../skywalking/api/logging/WriterFactory.java | 2 +- .../api/queue/TraceSegmentProcessQueue.java | 4 +- .../conf/SnifferConfigInitializerTest.java | 12 +-- .../api/logging/EasyLoggerTest.java | 2 +- .../api/logging/WriterFactoryTest.java | 6 +- .../src/test/resources/sky-walking.config | 4 +- .../plugin/dubbo/DubboInterceptorTest.java | 2 +- 14 files changed, 134 insertions(+), 36 deletions(-) create mode 100644 skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/RESTResponseStatusError.java diff --git a/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java b/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java index ea6ba500e..d2f2a659c 100644 --- a/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java +++ b/skywalking-sniffer/skywalking-agent/src/main/java/com/a/eye/skywalking/agent/SkyWalkingAgent.java @@ -99,8 +99,8 @@ public class SkyWalkingAgent { private static void initConfig() { - Config.SkyWalking.IS_PREMAIN_MODE = true; - Config.SkyWalking.AGENT_BASE_PATH = initAgentBasePath(); + Config.Agent.IS_PREMAIN_MODE = true; + Config.Agent.PATH = initAgentBasePath(); SnifferConfigInitializer.initialize(); } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java index 63e6a8b94..b77f95625 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/CollectorClient.java @@ -1,11 +1,21 @@ package com.a.eye.skywalking.api.client; import com.a.eye.skywalking.api.boot.ServiceManager; +import com.a.eye.skywalking.api.conf.Config; import com.a.eye.skywalking.api.queue.TraceSegmentProcessQueue; import com.a.eye.skywalking.logging.ILog; import com.a.eye.skywalking.logging.LogManager; +import com.a.eye.skywalking.trace.SegmentsMessage; import com.a.eye.skywalking.trace.TraceSegment; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.io.IOException; import java.util.List; +import java.util.Random; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; import org.apache.http.impl.client.HttpClients; @@ -21,11 +31,18 @@ public class CollectorClient implements Runnable { private static ILog logger = LogManager.getLogger(CollectorClient.class); private static long SLEEP_TIME_MILLIS = 500; private CloseableHttpClient httpclient; + private String[] serverList; + private volatile int selectedServer = -1; public CollectorClient() { + serverList = Config.Collector.SERVERS.split(","); httpclient = HttpClients.custom() .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy()) .build(); + Random r = new Random(); + if (serverList.length > 0) { + selectedServer = r.nextInt(serverList.length); + } } @Override @@ -36,13 +53,19 @@ public class CollectorClient implements Runnable { TraceSegmentProcessQueue segmentProcessQueue = ServiceManager.INSTANCE.findService(TraceSegmentProcessQueue.class); List cachedTraceSegments = segmentProcessQueue.getCachedTraceSegments(); if (cachedTraceSegments.size() > 0) { + SegmentsMessage message = null; + int count = 0; for (TraceSegment segment : cachedTraceSegments) { - /** - * No receiver found, means collector server is off-line. - */ - sleepTime = SLEEP_TIME_MILLIS * 10; - break; + if (message == null) { + message = new SegmentsMessage(); + } + message.append(segment); + if (count == Config.Collector.BATCH_SIZE) { + sendToCollector(message); + message = null; + } } + sendToCollector(message); } else { sleepTime = SLEEP_TIME_MILLIS; } @@ -56,6 +79,64 @@ public class CollectorClient implements Runnable { } } + /** + * Send the given {@link SegmentsMessage} to collector. + * + * @param message to be send. + */ + private void sendToCollector(SegmentsMessage message) throws RESTResponseStatusError, IOException { + if (message == null) { + return; + } + Gson gson = new GsonBuilder() + .excludeFieldsWithoutExposeAnnotation() + .create(); + String messageJson = gson.toJson(message); + + try { + HttpPost httpPost = ready2Send(messageJson); + if (httpPost != null) { + CloseableHttpResponse httpResponse = httpclient.execute(httpPost); + int statusCode = httpResponse.getStatusLine().getStatusCode(); + if (200 != statusCode) { + findBackupServer(); + throw new RESTResponseStatusError(statusCode); + } + } + } catch (IOException e) { + findBackupServer(); + throw e; + } + } + + /** + * Prepare the given message for HTTP Post service. + * + * @param messageJson to send + * @return {@link HttpPost}, when is ready to send. otherwise, null. + */ + private HttpPost ready2Send(String messageJson) { + if (selectedServer == -1) { + //no available server + return null; + } + HttpPost post = new HttpPost("http://" + serverList[selectedServer] + Config.Collector.SERVICE_NAME); + StringEntity entity = new StringEntity(messageJson, ContentType.APPLICATION_JSON); + post.setEntity(entity); + + return post; + } + + /** + * Choose the next server in {@link #serverList}, by moving {@link #selectedServer}. + */ + private void findBackupServer() { + selectedServer++; + if (selectedServer == serverList.length) { + selectedServer = 0; + } + } + /** * Try to sleep, and ignore the {@link InterruptedException} * diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/RESTResponseStatusError.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/RESTResponseStatusError.java new file mode 100644 index 000000000..db74873bd --- /dev/null +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/client/RESTResponseStatusError.java @@ -0,0 +1,13 @@ +package com.a.eye.skywalking.api.client; + +/** + * The RESTResponseStatusError represents the REST-Service client got an unexpected response code. + * Most likely, the response code is not 200. + * + * @author wusheng + */ +class RESTResponseStatusError extends Exception { + RESTResponseStatusError(int responseCode){ + super("Unexpected service response code: " + responseCode); + } +} diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java index e2cfae6d0..d13ad75e3 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/Config.java @@ -2,20 +2,24 @@ package com.a.eye.skywalking.api.conf; public class Config { - public static class SkyWalking { + public static class Agent { public static String APPLICATION_CODE = ""; public static boolean IS_PREMAIN_MODE = false; - public static String AGENT_BASE_PATH = ""; + public static String PATH = ""; + } + public static class Collector{ public static String SERVERS = ""; public static String SERVICE_NAME = "/segments"; + + public static int BATCH_SIZE = 50; } - public static class Disruptor{ - public static int BUFFER_SIZE = 512; + public static class Buffer { + public static int SIZE = 512; } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java index 6b63b2a44..9c9fd2f17 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializer.java @@ -15,7 +15,7 @@ public class SnifferConfigInitializer { public static void initialize() { InputStream configFileStream; - if (Config.SkyWalking.IS_PREMAIN_MODE) { + if (Config.Agent.IS_PREMAIN_MODE) { configFileStream = fetchAuthFileInputStream(); } else { configFileStream = SnifferConfigInitializer.class.getResourceAsStream("/sky-walking.config"); @@ -35,24 +35,24 @@ public class SnifferConfigInitializer { String applicationCode = System.getProperty("applicationCode"); if (!StringUtil.isEmpty(applicationCode)) { - Config.SkyWalking.APPLICATION_CODE = applicationCode; + Config.Agent.APPLICATION_CODE = applicationCode; } String servers = System.getProperty("servers"); if(!StringUtil.isEmpty(servers)) { - Config.SkyWalking.SERVERS = servers; + Config.Collector.SERVERS = servers; } - if (StringUtil.isEmpty(Config.SkyWalking.APPLICATION_CODE)) { + if (StringUtil.isEmpty(Config.Agent.APPLICATION_CODE)) { throw new ExceptionInInitializerError("'-DapplicationCode=' is missing."); } - if (StringUtil.isEmpty(Config.SkyWalking.SERVERS)) { + if (StringUtil.isEmpty(Config.Collector.SERVERS)) { throw new ExceptionInInitializerError("'-Dservers=' is missing."); } } private static InputStream fetchAuthFileInputStream() { try { - return new FileInputStream(Config.SkyWalking.AGENT_BASE_PATH + File.separator + "sky-walking.config"); + return new FileInputStream(Config.Agent.PATH + File.separator + "sky-walking.config"); } catch (Exception e) { logger.warn("sky-walking.config is missing, use default config."); return null; diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java index e484e0d4d..6229bd10e 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/context/TracerContext.java @@ -34,7 +34,7 @@ public final class TracerContext { * Create a {@link TraceSegment} and init {@link #spanIdGenerator} as 0; */ TracerContext() { - this.segment = new TraceSegment(Config.SkyWalking.APPLICATION_CODE); + this.segment = new TraceSegment(Config.Agent.APPLICATION_CODE); this.spanIdGenerator = 0; } @@ -123,7 +123,7 @@ public final class TracerContext { public void inject(ContextCarrier carrier) { carrier.setTraceSegmentId(this.segment.getTraceSegmentId()); carrier.setSpanId(this.activeSpan().getSpanId()); - carrier.setApplicationCode(Config.SkyWalking.APPLICATION_CODE); + carrier.setApplicationCode(Config.Agent.APPLICATION_CODE); carrier.setPeerHost(Tags.PEER_HOST.get(activeSpan())); carrier.setDistributedTraceIds(this.segment.getRelatedGlobalTraces()); } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java index 99d081143..72b305f2b 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/SyncFileWriter.java @@ -19,7 +19,7 @@ public class SyncFileWriter implements IWriter { private SyncFileWriter() { try { - File logFilePath = new File(Config.SkyWalking.AGENT_BASE_PATH, Config.Logging.LOG_DIR_NAME); + File logFilePath = new File(Config.Agent.PATH, Config.Logging.LOG_DIR_NAME); if (!logFilePath.exists()) { logFilePath.mkdirs(); } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java index fc38edfee..6e5bd4230 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/logging/WriterFactory.java @@ -4,7 +4,7 @@ import com.a.eye.skywalking.api.conf.Config; public class WriterFactory { public static IWriter getLogWriter(){ - if (Config.SkyWalking.IS_PREMAIN_MODE){ + if (Config.Agent.IS_PREMAIN_MODE){ return SyncFileWriter.instance(); }else{ return new STDOutWriter(); diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java index d868de9f4..9d25f94ff 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/api/queue/TraceSegmentProcessQueue.java @@ -30,8 +30,8 @@ public class TraceSegmentProcessQueue extends StatusBootService implements Trace private volatile int cacheIndex; public TraceSegmentProcessQueue() { - disruptor = new Disruptor(TraceSegmentHolder.Factory.INSTANCE, Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE); - secondLevelCache = new TraceSegment[Config.Disruptor.BUFFER_SIZE]; + disruptor = new Disruptor(TraceSegmentHolder.Factory.INSTANCE, Config.Buffer.SIZE, DaemonThreadFactory.INSTANCE); + secondLevelCache = new TraceSegment[Config.Buffer.SIZE]; cacheIndex = 0; disruptor.handleEventsWith(this); buffer = disruptor.getRingBuffer(); diff --git a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java index 4804c74ba..befd85f85 100644 --- a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java +++ b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/conf/SnifferConfigInitializerTest.java @@ -11,13 +11,13 @@ public class SnifferConfigInitializerTest { @Test public void testInitialize(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; SnifferConfigInitializer.initialize(); - Assert.assertEquals("crmApp", Config.SkyWalking.APPLICATION_CODE); - Assert.assertEquals("127.0.0.1:8080", Config.SkyWalking.SERVERS); + Assert.assertEquals("crmApp", Config.Agent.APPLICATION_CODE); + Assert.assertEquals("127.0.0.1:8080", Config.Collector.SERVERS); - Assert.assertNotNull(Config.Disruptor.BUFFER_SIZE); + Assert.assertNotNull(Config.Buffer.SIZE); Assert.assertNotNull(Config.Logging.LOG_DIR_NAME); Assert.assertNotNull(Config.Logging.LOG_FILE_NAME); Assert.assertNotNull(Config.Logging.MAX_LOG_FILE_LENGTH); @@ -26,12 +26,12 @@ public class SnifferConfigInitializerTest { @Test(expected = ExceptionInInitializerError.class) public void testErrorInitialize(){ - Config.SkyWalking.IS_PREMAIN_MODE = true; + Config.Agent.IS_PREMAIN_MODE = true; SnifferConfigInitializer.initialize(); } @AfterClass public static void reset(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; } } diff --git a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java index 0e6cd274c..854798abc 100644 --- a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java +++ b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/EasyLoggerTest.java @@ -26,7 +26,7 @@ public class EasyLoggerTest { @Test public void testLog(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; PrintStream output = Mockito.mock(PrintStream.class); System.setOut(output); diff --git a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java index eec580928..d7adefd40 100644 --- a/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java +++ b/skywalking-sniffer/skywalking-api/src/test/java/com/a/eye/skywalking/api/logging/WriterFactoryTest.java @@ -25,18 +25,18 @@ public class WriterFactoryTest { */ @Test public void testGetLogWriter(){ - Config.SkyWalking.IS_PREMAIN_MODE = true; + Config.Agent.IS_PREMAIN_MODE = true; PrintStream mockStream = Mockito.mock(PrintStream.class); System.setErr(mockStream); Assert.assertEquals(SyncFileWriter.instance(), WriterFactory.getLogWriter()); - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; Assert.assertTrue(WriterFactory.getLogWriter() instanceof STDOutWriter); } @AfterClass public static void reset(){ - Config.SkyWalking.IS_PREMAIN_MODE = false; + Config.Agent.IS_PREMAIN_MODE = false; System.setErr(errRef); } } diff --git a/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config b/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config index 7817dd446..b563dfb11 100644 --- a/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config +++ b/skywalking-sniffer/skywalking-api/src/test/resources/sky-walking.config @@ -1,2 +1,2 @@ -skywalking.application_code = crmApp -skywalking.servers = 127.0.0.1:8080 +agent.application_code = crmApp +collector.servers = 127.0.0.1:8080 diff --git a/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java b/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java index 4e479b039..9f03be8bf 100644 --- a/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java +++ b/skywalking-sniffer/skywalking-sdk-plugin/dubbo-plugin/src/test/java/com/a/eye/skywalking/plugin/dubbo/DubboInterceptorTest.java @@ -80,7 +80,7 @@ public class DubboInterceptorTest { Mockito.when(RpcContext.getContext()).thenReturn(rpcContext); when(rpcContext.isConsumerSide()).thenReturn(true); when(methodInvokeContext.allArguments()).thenReturn(new Object[]{invoker, invocation}); - Config.SkyWalking.APPLICATION_CODE = "DubboTestCases-APP"; + Config.Agent.APPLICATION_CODE = "DubboTestCases-APP"; } -- GitLab