diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java index b4bafe5bacbedd5107c087c43c90fca3ae2986ec..105cfde5e6c0a8a8f54848f1196fded5717da75e 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/AgentPackagePath.java @@ -18,14 +18,18 @@ package org.apache.skywalking.apm.agent.core.boot; -import java.net.URISyntaxException; -import org.apache.skywalking.apm.agent.core.logging.api.ILog; -import org.apache.skywalking.apm.agent.core.logging.api.LogManager; - import java.io.File; import java.net.MalformedURLException; +import java.net.URISyntaxException; import java.net.URL; +import org.apache.skywalking.apm.agent.core.logging.api.ILog; +import org.apache.skywalking.apm.agent.core.logging.api.LogManager; +/** + * AgentPackagePath is a flag and finder to locate the SkyWalking agent.jar. It gets the absolute path of the agent jar. + * The path is the required metadata for agent core looking up the plugins and toolkit activations. If the lookup + * mechanism fails, the agent will exit directly. + */ public class AgentPackagePath { private static final ILog logger = LogManager.getLogger(AgentPackagePath.class); @@ -67,7 +71,8 @@ public class AgentPackagePath { } } else { int prefixLength = "file:".length(); - String classLocation = urlString.substring(prefixLength, urlString.length() - classResourcePath.length()); + String classLocation = urlString.substring( + prefixLength, urlString.length() - classResourcePath.length()); return new File(classLocation); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index 00acefb56704f14d4bcc347b5469dd7b30ea6e13..c9ceccb19f082345cfbfd62467f96284d753079b 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -117,7 +117,25 @@ public class CoreModuleConfig extends ModuleConfig { return dataTTLConfig; } + /** + * OAP server could work in different roles. + */ public enum Role { - Mixed, Receiver, Aggregator + /** + * Default role. OAP works as the {@link #Receiver} and {@link #Aggregator} + */ + Mixed, + /** + * Receiver mode OAP open the service to the agents, analysis and aggregate the results and forward the results + * to {@link #Mixed} and {@link #Aggregator} roles OAP. The only exception is for {@link + * org.apache.skywalking.oap.server.core.analysis.record.Record}, they don't require 2nd round distributed + * aggregation, is being pushed into the storage from the receiver OAP directly. + */ + Receiver, + /** + * Aggregator mode OAP receives data from {@link #Mixed} and {@link #Aggregator} OAP nodes, and do 2nd round + * aggregation. Then save the final result to the storage. + */ + Aggregator } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 8f8d2b9dd128bd50a32f1fcf7211d78c6c6ffe01..2034a723ef7a36bba4f60fc72b26ea8f55761c1c 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -268,10 +268,10 @@ public class CoreModuleProvider extends ModuleProvider { if (CoreModuleConfig.Role.Mixed.name() .equalsIgnoreCase( - moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name() - .equalsIgnoreCase( - moduleConfig - .getRole())) { + moduleConfig.getRole()) + || CoreModuleConfig.Role.Aggregator.name() + .equalsIgnoreCase( + moduleConfig.getRole())) { RemoteInstance gRPCServerInstance = new RemoteInstance( new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true)); this.getManager() diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java index 6e85f804971918258babe193c46e4fedf871efd5..8f52e85e4fccb6a79c0521061ca3614078f313e0 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/SourceDispatcher.java @@ -20,6 +20,15 @@ package org.apache.skywalking.oap.server.core.analysis; import org.apache.skywalking.oap.server.core.source.Source; +/** + * SourceDispatcher implementation processes different types of the source. There are two kinds of the source + * dispatcher. All implementations are doing field values set/get to transfer the data to the streaming process. + * + * One is hard coded, which could be found through the hierarchy tree. The other is generated by OAL engine based on the + * templates insides oal-rt/src/main/resources/code-templates/dispatcher + * + * @param the data type of this dispatcher processes. + */ public interface SourceDispatcher { void dispatch(SOURCE source); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java index de99afb482f85cbccdea8aabe84cb416ee7285a7..334e5e4a38610c08e543fe256a02a18a317c97b7 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiver.java @@ -20,6 +20,10 @@ package org.apache.skywalking.oap.server.core.source; import org.apache.skywalking.oap.server.library.module.Service; +/** + * The source receiver implementation delegates to {@link org.apache.skywalking.oap.server.core.analysis.DispatcherManager} + * in order to forward source to the suitable real {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}. + */ public interface SourceReceiver extends Service { void receive(Source source); } diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java index d2ac97d0aa8b20d762eb57e9ff7710e7d021e4a6..6621a4274e05a7d68aab215b00333ff0aebbe780 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStream.java @@ -28,6 +28,11 @@ import org.apache.commons.io.filefilter.PrefixFileFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * DataStream provides the open APIs for the local file based cache system. + * + * @param type of data in the cache file. + */ class DataStream { private static final Logger logger = LoggerFactory.getLogger(DataStream.class); diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java index bb9e14249ba4a34b4d8d0b4213c3cc280efb16c2..c4161982072f732efb4e295631c4a8b874dfdbe7 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamReader.java @@ -29,17 +29,23 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.PrefixFileFilter; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.apm.util.StringUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * DataStreamReader represents the reader of the local file based cache provided by {@link DataStream}. It reads the + * data in the local cached file, and triggers the registered callback to process, also, provide the retry if the + * callback responses the process status is unsuccessful. + * + * This callback/retry mechanism is used in inventory register for multiple receivers. + * + * @param type of data in the cache file. + */ +@Slf4j public class DataStreamReader { - - private static final Logger logger = LoggerFactory.getLogger(DataStreamReader.class); - private final File directory; private final Offset.ReadOffset readOffset; private final Parser parser; @@ -50,7 +56,7 @@ public class DataStreamReader { private InputStream inputStream; DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser parser, - CallBack callBack) { + CallBack callBack) { this.directory = directory; this.readOffset = readOffset; this.parser = parser; @@ -62,7 +68,8 @@ public class DataStreamReader { preRead(); Executors.newSingleThreadScheduledExecutor() - .scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::read, t -> logger.error("Buffer data pre read failure.", t)), 3, 1, TimeUnit.SECONDS); + .scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::read, t -> log.error( + "Buffer data pre read failure.", t)), 3, 1, TimeUnit.SECONDS); } private void preRead() { @@ -76,7 +83,7 @@ public class DataStreamReader { try { inputStream.skip(readOffset.getOffset()); } catch (IOException e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } } else { openInputStream(readEarliestDataFile()); @@ -93,7 +100,7 @@ public class DataStreamReader { inputStream = new FileInputStream(readingFile); } catch (IOException e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } } @@ -111,8 +118,8 @@ public class DataStreamReader { } private void read() { - if (logger.isDebugEnabled()) { - logger.debug("Read buffer data"); + if (log.isDebugEnabled()) { + log.debug("Read buffer data"); } try { @@ -137,8 +144,8 @@ public class DataStreamReader { bufferDataCollection.add(bufferData); } - if (logger.isDebugEnabled()) { - logger.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize); + if (log.isDebugEnabled()) { + log.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize); } } else if (bufferDataCollection.size() > 0) { reCall(); @@ -146,7 +153,7 @@ public class DataStreamReader { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } } } @@ -155,7 +162,7 @@ public class DataStreamReader { reCall(); } } catch (IOException e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } } @@ -175,7 +182,7 @@ public class DataStreamReader { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } } else { break; @@ -183,6 +190,11 @@ public class DataStreamReader { } } + /** + * Callback when reader fetched data from the local file. + * + * @param type of data in the cache file. + */ public interface CallBack { boolean call(BufferData bufferData); } diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java index 021ed6cc96dcf80a2478fc00bba6ce5aa1a6c532..6c39c89c3313fc870be83319d7298e411fda0527 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/DataStreamWriter.java @@ -23,15 +23,18 @@ import com.google.protobuf.GeneratedMessageV3; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.skywalking.apm.util.StringUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * DataStreamReader represents the writer of the local file based cache provided by {@link DataStream}. It writes the + * given messages into the local files, and create new if necessary or file max size reached. + * + * @param type of data in the cache file. + */ +@Slf4j class DataStreamWriter { - - private static final Logger logger = LoggerFactory.getLogger(DataStreamWriter.class); - private final File directory; private final Offset.WriteOffset writeOffset; @@ -71,9 +74,9 @@ class DataStreamWriter { boolean created = writingFile.createNewFile(); if (!created) { - logger.info("The file named {} already exists.", writingFile.getAbsolutePath()); + log.info("The file named {} already exists.", writingFile.getAbsolutePath()); } else { - logger.info("Create a new buffer data file: {}", writingFile.getAbsolutePath()); + log.info("Create a new buffer data file: {}", writingFile.getAbsolutePath()); } writeOffset.setOffset(0); @@ -93,7 +96,7 @@ class DataStreamWriter { outputStream = FileUtils.openOutputStream(writingFile, true); } } catch (IOException e) { - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); } } } diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java index 488783a576c98936cb9edc39efeb34ab82cd77bf..54f17dd102c61a9e2e2db3bd00534c0127e096bf 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/Offset.java @@ -22,6 +22,9 @@ import lombok.Getter; import lombok.Setter; import org.apache.skywalking.apm.util.StringUtil; +/** + * File content offset definition. Offset is the pointer when read/write the local file. + */ class Offset { private static final String SPLIT_CHARACTER = ","; @@ -35,11 +38,20 @@ class Offset { readOffset = new ReadOffset(writeOffset); } + /** + * @return the offset data into a single literal string for the persistence. + */ String serialize() { - return readOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getOffset()) + SPLIT_CHARACTER + writeOffset + return readOffset.getFileName() + SPLIT_CHARACTER + String.valueOf( + readOffset.getOffset()) + SPLIT_CHARACTER + writeOffset .getFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getOffset()); } + /** + * Initialize the Offset object by given value. + * + * @param value serialized Offset + */ void deserialize(String value) { if (!StringUtil.isEmpty(value)) { String[] values = value.split(SPLIT_CHARACTER); diff --git a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java index c2fb63ede86996ff20aa41362ac74362b213a2dc..237b3374e3e010fa1a9667f59a04e6cd42743680 100644 --- a/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java +++ b/oap-server/server-library/library-buffer/src/main/java/org/apache/skywalking/oap/server/library/buffer/OffsetStream.java @@ -34,6 +34,10 @@ import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * OffsetStream is driven by the internal timer. Flush the hold read and write offset into the file. And restore the + * data from the same file in the initialization process.o + */ class OffsetStream { private static final Logger logger = LoggerFactory.getLogger(OffsetStream.class); @@ -79,7 +83,9 @@ class OffsetStream { initialized = true; Executors.newSingleThreadScheduledExecutor() - .scheduleAtFixedRate(new RunnableWithExceptionProtection(this::flush, t -> logger.error("Flush offset file in background failure.", t)), 2, 1, TimeUnit.SECONDS); + .scheduleAtFixedRate( + new RunnableWithExceptionProtection(this::flush, t -> logger.error( + "Flush offset file in background failure.", t)), 2, 1, TimeUnit.SECONDS); } } @@ -122,7 +128,8 @@ class OffsetStream { } private String readLastLine() throws IOException { - ReversedLinesFileReader reader = new ReversedLinesFileReader(offsetFile, Charset.forName(BufferFileUtils.CHARSET)); + ReversedLinesFileReader reader = new ReversedLinesFileReader( + offsetFile, Charset.forName(BufferFileUtils.CHARSET)); return reader.readLine(); } } diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/GRPCStreamStatus.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/GRPCStreamStatus.java index 06852f642f694a5a8f8c9b614ca3f34a86f32001..cd398963a82f9f63f5323d3d73d218ecaac5a980 100644 --- a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/GRPCStreamStatus.java +++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/GRPCStreamStatus.java @@ -20,6 +20,10 @@ package org.apache.skywalking.oap.server.library.util; import lombok.Getter; +/** + * GRPCStreamStatus is used for gRPC streaming client. It helps to make sure the gRPC client to wait the last streaming + * has the onCompleted or onError confirmation. + */ @Getter public class GRPCStreamStatus { diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java index 375576eb2616d72dce63e48805122ac4567aad29..084e098f33ce60bd7cf8202d5c6e160fcaa41fd8 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java @@ -26,9 +26,15 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; @Setter public class SharingServerConfig extends ModuleConfig { private String restHost; + /** + * Only setting the real port(not 0) makes the jetty server online. + */ private int restPort; private String restContextPath; private String gRPCHost; + /** + * Only setting the real port(not 0) makes the gRPC server online. + */ private int gRPCPort; private int maxConcurrentCallsPerConnection; private int maxMessageSize; diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModule.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModule.java index b3f3f286a3ef458c9bdda6f6d5794a9dac868090..076bae39a414358b4e43eab06c7eda4784cb79db 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModule.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModule.java @@ -22,6 +22,16 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleDefine; +/** + * Sharing server is an independent gRPC and Jetty servers provided for all receiver modules. In default, this module + * would not be activated unless the user active explicitly. It only delegates the core gRPC and Jetty servers. + * + * Once it is activated, provides separated servers, then all receivers use these to accept outside requests. Typical, + * this is activated to avoid the ip, port and thread pool sharing between receiver and internal traffics. For security + * consideration, receiver should open TLS and token check, and internal(remote module) traffic should base on trusted + * network, no TLS and token check. Even some companies may require TLS internally, it still use different TLS keys. In + * this specific case, we recommend users to consider use {@link org.apache.skywalking.oap.server.core.CoreModuleConfig.Role}. + */ public class SharingServerModule extends ModuleDefine { public static final String NAME = "receiver-sharing-server"; diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java index 4e199d5d5fd384720f06a314d47cd2bc60a60f61..66cf9a17f5313835c1c8100837a1b27236e3df26 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java @@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; import org.apache.skywalking.oap.server.library.module.Service; +/** + * Service of trace segment parser. + */ public interface ISegmentParserService extends Service { void send(UpstreamSegment segment); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java index af335803a48640de046bdd7e4e51155a52d1fc3c..4d970b6bd5861bfb60b9727f75755f4135ffd7b7 100755 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParseV2.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.ProtocolVersion; import org.apache.skywalking.apm.network.language.agent.SpanType; import org.apache.skywalking.apm.network.language.agent.UniqueId; @@ -53,16 +54,24 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * SegmentParseV2 is a replication of SegmentParse, but be compatible with v2 trace protocol. + * SegmentParseV2 replaced the SegmentParse(V1 is before 6.0.0) to drive the segment analysis. It includes the following + * steps + * + * 1. Register data, name to ID register + * + * 2. If register unfinished, cache in the local buffer file. And back to (1). + * + * 3. If register finished, traverse the span and analysis by the given {@link SpanListener}s. + * + * 4. Notify the build event to all {@link SpanListener}s in order to forward all built sources into dispatchers. + * + * @since 6.0.0 In the 6.x, the V1 and V2 analysis both exist. + * @since 7.0.0 SegmentParse(V1) has been removed permanently. */ +@Slf4j public class SegmentParseV2 { - - private static final Logger logger = LoggerFactory.getLogger(SegmentParseV2.class); - private final ModuleManager moduleManager; private final List spanListeners; private final SegmentParserListenerManager listenerManager; @@ -76,7 +85,7 @@ public class SegmentParseV2 { private volatile static CounterMetrics TRACE_PARSE_ERROR; private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, - TraceServiceModuleConfig config) { + TraceServiceModuleConfig config) { this.moduleManager = moduleManager; this.listenerManager = listenerManager; this.spanListeners = new LinkedList<>(); @@ -90,9 +99,19 @@ public class SegmentParseV2 { MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) .provider() .getService(MetricsCreator.class); - TRACE_BUFFER_FILE_RETRY = metricsCreator.createCounter("v6_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); - TRACE_BUFFER_FILE_OUT = metricsCreator.createCounter("v6_trace_buffer_file_out", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); - TRACE_PARSE_ERROR = metricsCreator.createCounter("v6_trace_parse_error", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE); + TRACE_BUFFER_FILE_RETRY = metricsCreator.createCounter( + "v6_trace_buffer_file_retry", + "The number of retry trace segment from the buffer file, but haven't registered successfully.", + MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE + ); + TRACE_BUFFER_FILE_OUT = metricsCreator.createCounter( + "v6_trace_buffer_file_out", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY, + MetricsTag.EMPTY_VALUE + ); + TRACE_PARSE_ERROR = metricsCreator.createCounter( + "v6_trace_parse_error", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY, + MetricsTag.EMPTY_VALUE + ); } this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME) @@ -116,15 +135,19 @@ public class SegmentParseV2 { // Recheck in case that the segment comes from file buffer final int serviceInstanceId = segmentObject.getServiceInstanceId(); if (serviceInstanceInventoryCache.get(serviceInstanceId) == null) { - logger.warn("Cannot recognize service instance id [{}] from cache, segment will be ignored", serviceInstanceId); + log.warn( + "Cannot recognize service instance id [{}] from cache, segment will be ignored", serviceInstanceId); return true; // to mark it "completed" thus won't be retried } SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject); if (!preBuild(traceIds, segmentDecorator)) { - if (logger.isDebugEnabled()) { - logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId()); + if (log.isDebugEnabled()) { + log.debug( + "This segment id exchange not success, write to buffer file, id: {}", + segmentCoreInfo.getSegmentId() + ); } if (source.equals(SegmentSource.Agent)) { @@ -135,8 +158,8 @@ public class SegmentParseV2 { } return false; } else { - if (logger.isDebugEnabled()) { - logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId()); + if (log.isDebugEnabled()) { + log.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId()); } notifyListenerToBuild(); @@ -144,7 +167,7 @@ public class SegmentParseV2 { } } catch (Throwable e) { TRACE_PARSE_ERROR.inc(); - logger.error(e.getMessage(), e); + log.error(e.getMessage(), e); return true; } } @@ -213,8 +236,8 @@ public class SegmentParseV2 { } else if (SpanType.Local.equals(spanDecorator.getSpanType())) { notifyLocalListener(spanDecorator); } else { - logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType() - .name()); + log.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType() + .name()); } } } @@ -223,8 +246,8 @@ public class SegmentParseV2 { } private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { - if (logger.isDebugEnabled()) { - logger.debug("push to segment buffer write worker, id: {}", id); + if (log.isDebugEnabled()) { + log.debug("push to segment buffer write worker, id: {}", id); } SegmentStandardization standardization = new SegmentStandardization(id); @@ -279,7 +302,8 @@ public class SegmentParseV2 { private void createSpanListeners() { listenerManager.getSpanListenerFactories() - .forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config))); + .forEach( + spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config))); } public static class Producer implements DataStreamReader.CallBack { @@ -291,7 +315,7 @@ public class SegmentParseV2 { private final TraceServiceModuleConfig config; public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager, - TraceServiceModuleConfig config) { + TraceServiceModuleConfig config) { this.moduleManager = moduleManager; this.listenerManager = listenerManager; this.config = config; diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java index 35a54dd5c8cb873bdeace5c27d2c959475cf15fe..f22fc49e297d3ed61365ce23916d0e832a8e164a 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java @@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; +/** + * The open service to the receivers. Segment parser for v2 trace protocol. + */ public class SegmentParserServiceImpl implements ISegmentParserService { private final SegmentParseV2.Producer segmentProducer; diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentSource.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentSource.java index 0b5b87becb3a2018633a3a3cc78e171aacd2c4cf..eaef1e1e9fef25ee8414e6e68730bd0408f4edf5 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentSource.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentSource.java @@ -18,6 +18,16 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser; +/** + * The segment source. + */ public enum SegmentSource { - Agent, Buffer + /** + * From the client side agent. + */ + Agent, + /** + * From the buffer file, because the last time register has not be successful. + */ + Buffer } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java index dc07d24526ebbeccfedd138cc9bff2f142864a5c..8632ba6efe82a20782336b648ba7d3e08a4c77f5 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SpanDecorator.java @@ -26,6 +26,10 @@ import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2; import static java.util.Objects.isNull; +/** + * SpanDecorator is used in the metadata register process, and provides an easy access way consistently, no matter + * before or after the register. + */ public class SpanDecorator implements StandardBuilder { private boolean isOrigin = true; private final StandardBuilder standardBuilder; diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/StandardBuilder.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/StandardBuilder.java index 1dc6932f1dadb4df74a7a2551eeaf9667c980c49..f9de494e5616c3161af8b5094195a3ae53040fe5 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/StandardBuilder.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/StandardBuilder.java @@ -18,6 +18,12 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator; +/** + * The implementation supports immutable data format to mutable builder transformation. + */ public interface StandardBuilder { + /** + * Change the status to be mutable if it hasn't. Keep in the mutable status when this is called multiple times. + */ void toBuilder(); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/EntrySpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/EntrySpanListener.java index 8d1c9ee2ea60813205c83ffbd1f1a251df8b1132..2f88ed1b1c7fbee936c83b3d0416a71f4db20b98 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/EntrySpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/EntrySpanListener.java @@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; +/** + * SpanListener for Entry span. + */ public interface EntrySpanListener extends SpanListener { void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo); } \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/ExitSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/ExitSpanListener.java index da3aea4bd34bc309c0a2e084884477265d52b632..17ebf0105283414cad91c996ce59e6f2bd6ae44d 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/ExitSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/ExitSpanListener.java @@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; +/** + * SpanListener for exit span. + */ public interface ExitSpanListener extends SpanListener { void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/FirstSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/FirstSpanListener.java index 8d7a289cb6e432351360a5719bd7cbd39f4f3fa6..396c44c312a15da72a86550e63a4d7c2aa11b8af 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/FirstSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/FirstSpanListener.java @@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; +/** + * SpanListener for the first span in the segment. The first span means span id is 0. + */ public interface FirstSpanListener extends SpanListener { void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/GlobalTraceIdsListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/GlobalTraceIdsListener.java index 8a62852611ebc458c91df617d5f5bdfd8ea60a51..e7bab8262817482c06197c9629ed38b0dcf52935 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/GlobalTraceIdsListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/GlobalTraceIdsListener.java @@ -21,6 +21,10 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener import org.apache.skywalking.apm.network.language.agent.UniqueId; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; +/** + * GlobalTraceIdsListener is the first notified listener in the trace analysis. The notifications include the trace is + * with other segment core info. + */ public interface GlobalTraceIdsListener extends SpanListener { void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/LocalSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/LocalSpanListener.java index 6aa2f1eb01cc241d7529bf50de4c8200b04a6a46..3c2f9fd4665c52479d257e34dcfc8305bd2a711c 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/LocalSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/LocalSpanListener.java @@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; +/** + * SpanListener for local span + */ public interface LocalSpanListener extends SpanListener { void parseLocal(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java index 5614d6a771f53d4214af157e21787ac6e115422a..60652f2afc836d660d656169fd2e6cb3361d3140 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java @@ -18,11 +18,24 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener; +/** + * SpanListener represents the callback when OAP does the trace segment analysis. + */ public interface SpanListener { + /** + * The last step of the analysis process. Typically, the implementations forward the analysis results to the source + * receiver. + */ void build(); + /** + * @return true, if the given point matches the implementation. + */ boolean containsPoint(Point point); + /** + * Analysis point when the analysis core traverses the segment + */ enum Point { Entry, Exit, Local, First, TraceIds } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java index 03a69cabb0d56f939312b27fe5cdf020ba0a2746..1841503464da3e6d915b49620f330a1d33ddff95 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListenerFactory.java @@ -21,6 +21,10 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; +/** + * SpanListenerFactory implementation creates the listener instances when required. Every SpanListener could have its + * own creation factory. + */ public interface SpanListenerFactory { SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java index 41f1b02f78c17c9e595e16db4d8c63ec2f9f8ec8..9151fc4d5b21378aa6e27b0e2e9dbe02c8f0aa5b 100755 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java @@ -54,7 +54,9 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener. import static java.util.Objects.nonNull; /** - * Notice, in here, there are following concepts match + * MultiScopesSpanListener includes the most segment to source(s) logic. + * + * This listener traverses the whole segment. */ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListener, GlobalTraceIdsListener { @@ -98,6 +100,16 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.TraceIds.equals(point); } + /** + * All entry spans are transferred as the Service, Instance and Endpoint related sources. Entry spans are treated on + * the behalf of the observability status of the service reported these spans. + * + * Also, when face the MQ and uninstrumented Gateways, there is different logic to generate the relationship between + * services/instances rather than the normal RPC direct call. The reason is the same, we aren't expecting the agent + * installed in the MQ server, and Gateway may not have suitable agent. Any uninstrumented service if they have the + * capability to forward SkyWalking header through themselves, you could consider the uninstrumented configurations + * to make the topology works to be a whole. + */ @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket(); @@ -118,7 +130,8 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ) || config.getUninstrumentedGatewaysConfig() .isAddressConfiguredAsGateway(address)) { - int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(serviceIdByPeerId, networkAddressId); + int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId( + serviceIdByPeerId, networkAddressId); sourceBuilder.setSourceServiceInstanceId(instanceIdByPeerId); sourceBuilder.setSourceServiceId(serviceIdByPeerId); } else { @@ -151,6 +164,10 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe this.entrySpanDecorator = spanDecorator; } + /** + * The exit span should be transferred to the service, instance and relationships from the client side detect + * point. + */ @Override public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { if (this.minuteTimeBucket == 0) { @@ -239,8 +256,9 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe } sourceBuilder.setSourceServiceName(serviceInventoryCache.get(sourceBuilder.getSourceServiceId()).getName()); - sourceBuilder.setSourceServiceInstanceName(instanceInventoryCache.get(sourceBuilder.getSourceServiceInstanceId()) - .getName()); + sourceBuilder.setSourceServiceInstanceName( + instanceInventoryCache.get(sourceBuilder.getSourceServiceInstanceId()) + .getName()); if (sourceBuilder.getSourceEndpointId() != Const.NONE) { sourceBuilder.setSourceEndpointName(endpointInventoryCache.get(sourceBuilder.getSourceEndpointId()) .getName()); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java index 6c9c01ad72d617b1645b7c991bd6fe33b521448e..bbb399b7ad3dac58f4585747fa737392070b9cd1 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java @@ -19,15 +19,16 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.language.agent.UniqueId; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache; import org.apache.skywalking.oap.server.core.source.Segment; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.BooleanUtils; -import org.apache.skywalking.oap.server.core.analysis.TimeBucket; import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; @@ -36,13 +37,12 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener. import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * SegmentSpanListener forwards the segment raw data to the persistence layer with the query required conditions. + */ +@Slf4j public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener, GlobalTraceIdsListener { - - private static final Logger logger = LoggerFactory.getLogger(SegmentSpanListener.class); - private final SourceReceiver sourceReceiver; private final TraceSegmentSampler sampler; private final Segment segment = new Segment(); @@ -113,8 +113,8 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener @Override public void build() { - if (logger.isDebugEnabled()) { - logger.debug("segment listener build, segment id: {}", segment.getSegmentId()); + if (log.isDebugEnabled()) { + log.debug("segment listener build, segment id: {}", segment.getSegmentId()); } if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceInstanceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceInstanceMappingSpanListener.java index 4201747e7f4dcd0630f3e1813f6c9455015aef10..6954b3150383d33a4bd855657abfaa71fef6f102 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceInstanceMappingSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceInstanceMappingSpanListener.java @@ -18,8 +18,11 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service; +import java.util.ArrayList; +import java.util.List; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.language.agent.SpanLayer; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; @@ -35,16 +38,15 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; +/** + * Service Instance mapping basically is as same as the service mapping. The network address fetched from the propagated + * context is the alias for the specific service instance. This is just more detailed mapping setup. + * + * Read {@link ServiceMappingSpanListener}. + */ +@Slf4j public class ServiceInstanceMappingSpanListener implements EntrySpanListener { - - private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceMappingSpanListener.class); - private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister; private final TraceServiceModuleConfig config; private final ServiceInventoryCache serviceInventoryCache; @@ -71,21 +73,26 @@ public class ServiceInstanceMappingSpanListener implements EntrySpanListener { @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { - if (logger.isDebugEnabled()) { - logger.debug("service instance mapping listener parse reference"); + if (log.isDebugEnabled()) { + log.debug("service instance mapping listener parse reference"); } if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) { if (spanDecorator.getRefsCount() > 0) { for (int i = 0; i < spanDecorator.getRefsCount(); i++) { int networkAddressId = spanDecorator.getRefs(i).getNetworkAddressId(); String address = networkAddressInventoryCache.get(networkAddressId).getName(); - int serviceInstanceId = serviceInstanceInventoryCache.getServiceInstanceId(serviceInventoryCache.getServiceId(networkAddressId), networkAddressId); + int serviceInstanceId = serviceInstanceInventoryCache.getServiceInstanceId( + serviceInventoryCache.getServiceId(networkAddressId), networkAddressId); if (config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) { - if (logger.isDebugEnabled()) { - logger.debug("{} is configured as gateway, will reset its mapping service instance id", serviceInstanceId); + if (log.isDebugEnabled()) { + log.debug( + "{} is configured as gateway, will reset its mapping service instance id", + serviceInstanceId + ); } - ServiceInstanceInventory instanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId); + ServiceInstanceInventory instanceInventory = serviceInstanceInventoryCache.get( + serviceInstanceId); if (instanceInventory.getMappingServiceInstanceId() != Const.NONE && !serviceInstancesToResetMapping .contains(serviceInstanceId)) { serviceInstancesToResetMapping.add(serviceInstanceId); @@ -104,15 +111,17 @@ public class ServiceInstanceMappingSpanListener implements EntrySpanListener { @Override public void build() { serviceInstanceMappings.forEach(instanceMapping -> { - if (logger.isDebugEnabled()) { - logger.debug("service instance mapping listener build, service id: {}, mapping service id: {}", instanceMapping - .getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId()); + if (log.isDebugEnabled()) { + log.debug( + "service instance mapping listener build, service id: {}, mapping service id: {}", instanceMapping + .getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId()); } - serviceInstanceInventoryRegister.updateMapping(instanceMapping.getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId()); + serviceInstanceInventoryRegister.updateMapping( + instanceMapping.getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId()); }); serviceInstancesToResetMapping.forEach(instanceId -> { - if (logger.isDebugEnabled()) { - logger.debug("service instance mapping listener build, reset mapping of service id: {}", instanceId); + if (log.isDebugEnabled()) { + log.debug("service instance mapping listener build, reset mapping of service id: {}", instanceId); } serviceInstanceInventoryRegister.resetMapping(instanceId); }); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java index df42c5c4289c942db0a6586d86d5f9e798eecf2d..bcfece163e3da3ab0e0e5fee6f5212976d089633 100755 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java @@ -18,8 +18,11 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service; +import java.util.ArrayList; +import java.util.List; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.language.agent.SpanLayer; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; @@ -34,16 +37,13 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; +/** + * ServiceMappingSpanListener includes the specific logic about the concept of service mapping. Service mapping is the + * core idea to make the SkyWalking has good performance and low memory costs, when discovery the big topology. + */ +@Slf4j public class ServiceMappingSpanListener implements EntrySpanListener { - - private static final Logger logger = LoggerFactory.getLogger(ServiceMappingSpanListener.class); - private final IServiceInventoryRegister serviceInventoryRegister; private final TraceServiceModuleConfig config; private final ServiceInventoryCache serviceInventoryCache; @@ -69,10 +69,15 @@ public class ServiceMappingSpanListener implements EntrySpanListener { return Point.Entry.equals(point); } + /** + * Fetch the network address information used at the client side from the propagated context(headers mostly. Besides + * the MQ and uninstrumented services, the the network address will be treated as the alias name of the current + * service. The alias mechanism is the service mapping. + */ @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { - if (logger.isDebugEnabled()) { - logger.debug("service mapping listener parse reference"); + if (log.isDebugEnabled()) { + log.debug("service mapping listener parse reference"); } if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) { @@ -83,11 +88,12 @@ public class ServiceMappingSpanListener implements EntrySpanListener { int serviceId = serviceInventoryCache.getServiceId(networkAddressId); if (config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) { - if (logger.isDebugEnabled()) { - logger.debug("{} is configured as gateway, will reset its mapping service id", serviceId); + if (log.isDebugEnabled()) { + log.debug("{} is configured as gateway, will reset its mapping service id", serviceId); } ServiceInventory serviceInventory = serviceInventoryCache.get(serviceId); - if (serviceInventory.getMappingServiceId() != Const.NONE && !servicesToResetMapping.contains(serviceId)) { + if (serviceInventory.getMappingServiceId() != Const.NONE && !servicesToResetMapping.contains( + serviceId)) { servicesToResetMapping.add(serviceId); } } else { @@ -104,15 +110,18 @@ public class ServiceMappingSpanListener implements EntrySpanListener { @Override public void build() { serviceMappings.forEach(serviceMapping -> { - if (logger.isDebugEnabled()) { - logger.debug("service mapping listener build, service id: {}, mapping service id: {}", serviceMapping.getServiceId(), serviceMapping - .getMappingServiceId()); + if (log.isDebugEnabled()) { + log.debug( + "service mapping listener build, service id: {}, mapping service id: {}", + serviceMapping.getServiceId(), serviceMapping + .getMappingServiceId() + ); } serviceInventoryRegister.updateMapping(serviceMapping.getServiceId(), serviceMapping.getMappingServiceId()); }); servicesToResetMapping.forEach(serviceId -> { - if (logger.isDebugEnabled()) { - logger.debug("service mapping listener build, reset mapping of service id: {}", serviceId); + if (log.isDebugEnabled()) { + log.debug("service mapping listener build, reset mapping of service id: {}", serviceId); } serviceInventoryRegister.resetMapping(serviceId); }); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/IdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/IdExchanger.java index 59963da376ab1258a555bb91e850653d7425591a..8a8420adcaed3b14c6bebf3cd4893f249f74ea66 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/IdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/IdExchanger.java @@ -20,6 +20,20 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standard import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.StandardBuilder; +/** + * The implementation has details to do String to ID(integer) transformation. + */ public interface IdExchanger { + /** + * Register all required fields in the builder to get the assigned IDs. + * + * @param standardBuilder object includes unregistered data. + * @param serviceId service id of this builder. + * @return true if all register completed. NOTICE, because the register is in async mode, mostly because this is a + * distributed register mechanism, check {@link org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor}, + * the required ID could be unreachable as the register is still in processing. But in the production environment, + * besides the moments of the SkyWalking just being setup or new service/instance/endpoint online, all the registers + * should have finished back to when they are accessed at the first time. This register could process very fast. + */ boolean exchange(T standardBuilder, int serviceId); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java index d3730391619cf14b3e5172ae18b79e14cba05067..911c922043e5cb7da360f306691d31039e7e6c39 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization; import com.google.common.base.Strings; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.core.Const; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; @@ -27,13 +28,16 @@ import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInv import org.apache.skywalking.oap.server.core.source.DetectPoint; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * Register the information inside the segment reference. All of them are downstream(caller) service information. + * Reference could include multiple rows, as this span could have multiple downstream, such as batch process, typically + * MQ consumer. + * + * Check the Cross Process Propagation Headers Protocol v2 for the details in the references. + */ +@Slf4j public class ReferenceIdExchanger implements IdExchanger { - - private static final Logger logger = LoggerFactory.getLogger(ReferenceIdExchanger.class); - private static ReferenceIdExchanger EXCHANGER; private final IEndpointInventoryRegister endpointInventoryRegister; private final ServiceInstanceInventoryCache serviceInstanceInventoryCache; @@ -63,14 +67,18 @@ public class ReferenceIdExchanger implements IdExchanger { boolean exchanged = true; if (standardBuilder.getEntryEndpointId() == 0) { - String entryEndpointName = Strings.isNullOrEmpty(standardBuilder.getEntryEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder + String entryEndpointName = Strings.isNullOrEmpty( + standardBuilder.getEntryEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder .getEntryEndpointName(); int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()) .getServiceId(); int entryEndpointId = getEndpointId(entryServiceId, entryEndpointName); if (entryEndpointId == 0) { - if (logger.isDebugEnabled()) { - logger.debug("entry endpoint name: {} from service id: {} exchange failed", entryEndpointName, entryServiceId); + if (log.isDebugEnabled()) { + log.debug( + "entry endpoint name: {} from service id: {} exchange failed", entryEndpointName, + entryServiceId + ); } exchanged = false; @@ -86,15 +94,19 @@ public class ReferenceIdExchanger implements IdExchanger { } if (standardBuilder.getParentEndpointId() == 0) { - String parentEndpointName = Strings.isNullOrEmpty(standardBuilder.getParentEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder + String parentEndpointName = Strings.isNullOrEmpty( + standardBuilder.getParentEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder .getParentEndpointName(); int parentServiceId = serviceInstanceInventoryCache.get(standardBuilder.getParentServiceInstanceId()) .getServiceId(); int parentEndpointId = getEndpointId(parentServiceId, parentEndpointName); if (parentEndpointId == 0) { - if (logger.isDebugEnabled()) { - logger.debug("parent endpoint name: {} from service id: {} exchange failed", parentEndpointName, parentServiceId); + if (log.isDebugEnabled()) { + log.debug( + "parent endpoint name: {} from service id: {} exchange failed", parentEndpointName, + parentServiceId + ); } exchanged = false; @@ -110,11 +122,15 @@ public class ReferenceIdExchanger implements IdExchanger { } if (standardBuilder.getNetworkAddressId() == 0 && !Strings.isNullOrEmpty(standardBuilder.getNetworkAddress())) { - int networkAddressId = networkAddressInventoryRegister.getOrCreate(standardBuilder.getNetworkAddress(), null); + int networkAddressId = networkAddressInventoryRegister.getOrCreate( + standardBuilder.getNetworkAddress(), null); if (networkAddressId == 0) { - if (logger.isDebugEnabled()) { - logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId); + if (log.isDebugEnabled()) { + log.debug( + "network getAddress: {} from service id: {} exchange failed", + standardBuilder.getNetworkAddress(), serviceId + ); } exchanged = false; diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanExchanger.java index 9d6f534ef789c81c5a32b8b139812fabf1ba2a4c..c0383b2b12aa91bf5282705ec1e30c17a955ff4f 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanExchanger.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanExchanger.java @@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standard import com.google.common.base.Strings; import com.google.gson.JsonObject; +import java.util.List; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.common.KeyStringValuePair; import org.apache.skywalking.apm.network.language.agent.SpanLayer; import org.apache.skywalking.apm.network.language.agent.SpanType; @@ -38,15 +40,15 @@ import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryR import org.apache.skywalking.oap.server.core.source.DetectPoint; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; +/** + * SpanExchanger process the segment owner(service/instance) ID register, including operation name and network address. + * + * @since 6.6.0 only the operation name of entry span is registered as the endpoint, others keep the operation name as + * the literal string. + */ +@Slf4j public class SpanExchanger implements IdExchanger { - - private static final Logger logger = LoggerFactory.getLogger(SpanExchanger.class); - private static SpanExchanger EXCHANGER; private final ServiceInventoryCache serviceInventoryCacheDAO; private final IServiceInventoryRegister serviceInventoryRegister; @@ -95,8 +97,9 @@ public class SpanExchanger implements IdExchanger { int componentId = componentLibraryCatalogService.getComponentId(standardBuilder.getComponent()); if (componentId == 0) { - if (logger.isDebugEnabled()) { - logger.debug("component: {} in service: {} exchange failed", standardBuilder.getComponent(), serviceId); + if (log.isDebugEnabled()) { + log.debug( + "component: {} in service: {} exchange failed", standardBuilder.getComponent(), serviceId); } exchanged = false; @@ -109,11 +112,12 @@ public class SpanExchanger implements IdExchanger { int peerId = standardBuilder.getPeerId(); if (peerId == 0 && !Strings.isNullOrEmpty(standardBuilder.getPeer())) { - peerId = networkAddressInventoryRegister.getOrCreate(standardBuilder.getPeer(), buildServiceProperties(standardBuilder)); + peerId = networkAddressInventoryRegister.getOrCreate( + standardBuilder.getPeer(), buildServiceProperties(standardBuilder)); if (peerId == Const.NONE) { - if (logger.isDebugEnabled()) { - logger.debug("peer: {} in service: {} exchange failed", standardBuilder.getPeer(), serviceId); + if (log.isDebugEnabled()) { + log.debug("peer: {} in service: {} exchange failed", standardBuilder.getPeer(), serviceId); } exchanged = false; @@ -135,7 +139,8 @@ public class SpanExchanger implements IdExchanger { * it will only be updated at the first time for now. */ JsonObject properties = null; - ServiceInventory newServiceInventory = serviceInventoryCacheDAO.get(serviceInventoryCacheDAO.getServiceId(peerId)); + ServiceInventory newServiceInventory = serviceInventoryCacheDAO.get( + serviceInventoryCacheDAO.getServiceId(peerId)); if (SpanLayer.Database.equals(standardBuilder.getSpanLayer())) { if (!newServiceInventory.hasProperties()) { properties = buildServiceProperties(standardBuilder); @@ -143,8 +148,9 @@ public class SpanExchanger implements IdExchanger { } serviceInventoryRegister.update(newServiceInventory.getSequence(), nodeType, properties); - ServiceInstanceInventory newServiceInstanceInventory = serviceInstanceInventoryCacheDAO.get(serviceInstanceInventoryCacheDAO - .getServiceInstanceId(newServiceInventory.getSequence(), peerId)); + ServiceInstanceInventory newServiceInstanceInventory = serviceInstanceInventoryCacheDAO.get( + serviceInstanceInventoryCacheDAO + .getServiceInstanceId(newServiceInventory.getSequence(), peerId)); serviceInstanceInventoryRegister.update(newServiceInstanceInventory.getSequence(), nodeType, properties); } @@ -154,14 +160,16 @@ public class SpanExchanger implements IdExchanger { * so, since 6.6.0, only it triggers register. */ if (SpanType.Entry.equals(standardBuilder.getSpanType())) { - String endpointName = Strings.isNullOrEmpty(standardBuilder.getOperationName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder + String endpointName = Strings.isNullOrEmpty( + standardBuilder.getOperationName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder .getOperationName(); - int endpointId = endpointInventoryRegister.getOrCreate(serviceId, endpointName, DetectPoint.fromSpanType(standardBuilder - .getSpanType())); + int endpointId = endpointInventoryRegister.getOrCreate( + serviceId, endpointName, DetectPoint.fromSpanType(standardBuilder + .getSpanType())); if (endpointId == 0) { - if (logger.isDebugEnabled()) { - logger.debug("endpoint name: {} from service id: {} exchange failed", endpointName, serviceId); + if (log.isDebugEnabled()) { + log.debug("endpoint name: {} from service id: {} exchange failed", endpointName, serviceId); } exchanged = false;