未验证 提交 ae442e36 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

More comments for important classes (#4361)

上级 5f4b566c
......@@ -18,14 +18,18 @@
package org.apache.skywalking.apm.agent.core.boot;
import java.net.URISyntaxException;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
/**
* AgentPackagePath is a flag and finder to locate the SkyWalking agent.jar. It gets the absolute path of the agent jar.
* The path is the required metadata for agent core looking up the plugins and toolkit activations. If the lookup
* mechanism fails, the agent will exit directly.
*/
public class AgentPackagePath {
private static final ILog logger = LogManager.getLogger(AgentPackagePath.class);
......@@ -67,7 +71,8 @@ public class AgentPackagePath {
}
} else {
int prefixLength = "file:".length();
String classLocation = urlString.substring(prefixLength, urlString.length() - classResourcePath.length());
String classLocation = urlString.substring(
prefixLength, urlString.length() - classResourcePath.length());
return new File(classLocation);
}
}
......
......@@ -117,7 +117,25 @@ public class CoreModuleConfig extends ModuleConfig {
return dataTTLConfig;
}
/**
* OAP server could work in different roles.
*/
public enum Role {
Mixed, Receiver, Aggregator
/**
* Default role. OAP works as the {@link #Receiver} and {@link #Aggregator}
*/
Mixed,
/**
* Receiver mode OAP open the service to the agents, analysis and aggregate the results and forward the results
* to {@link #Mixed} and {@link #Aggregator} roles OAP. The only exception is for {@link
* org.apache.skywalking.oap.server.core.analysis.record.Record}, they don't require 2nd round distributed
* aggregation, is being pushed into the storage from the receiver OAP directly.
*/
Receiver,
/**
* Aggregator mode OAP receives data from {@link #Mixed} and {@link #Aggregator} OAP nodes, and do 2nd round
* aggregation. Then save the final result to the storage.
*/
Aggregator
}
}
......@@ -268,10 +268,10 @@ public class CoreModuleProvider extends ModuleProvider {
if (CoreModuleConfig.Role.Mixed.name()
.equalsIgnoreCase(
moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name()
.equalsIgnoreCase(
moduleConfig
.getRole())) {
moduleConfig.getRole())
|| CoreModuleConfig.Role.Aggregator.name()
.equalsIgnoreCase(
moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(
new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager()
......
......@@ -20,6 +20,15 @@ package org.apache.skywalking.oap.server.core.analysis;
import org.apache.skywalking.oap.server.core.source.Source;
/**
* SourceDispatcher implementation processes different types of the source. There are two kinds of the source
* dispatcher. All implementations are doing field values set/get to transfer the data to the streaming process.
*
* One is hard coded, which could be found through the hierarchy tree. The other is generated by OAL engine based on the
* templates insides oal-rt/src/main/resources/code-templates/dispatcher
*
* @param <SOURCE> the data type of this dispatcher processes.
*/
public interface SourceDispatcher<SOURCE extends Source> {
void dispatch(SOURCE source);
}
......@@ -20,6 +20,10 @@ package org.apache.skywalking.oap.server.core.source;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* The source receiver implementation delegates to {@link org.apache.skywalking.oap.server.core.analysis.DispatcherManager}
* in order to forward source to the suitable real {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}.
*/
public interface SourceReceiver extends Service {
void receive(Source source);
}
......@@ -28,6 +28,11 @@ import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataStream provides the open APIs for the local file based cache system.
*
* @param <MESSAGE_TYPE> type of data in the cache file.
*/
class DataStream<MESSAGE_TYPE extends GeneratedMessageV3> {
private static final Logger logger = LoggerFactory.getLogger(DataStream.class);
......
......@@ -29,17 +29,23 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataStreamReader represents the reader of the local file based cache provided by {@link DataStream}. It reads the
* data in the local cached file, and triggers the registered callback to process, also, provide the retry if the
* callback responses the process status is unsuccessful.
*
* This callback/retry mechanism is used in inventory register for multiple receivers.
*
* @param <MESSAGE_TYPE> type of data in the cache file.
*/
@Slf4j
public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private static final Logger logger = LoggerFactory.getLogger(DataStreamReader.class);
private final File directory;
private final Offset.ReadOffset readOffset;
private final Parser<MESSAGE_TYPE> parser;
......@@ -50,7 +56,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private InputStream inputStream;
DataStreamReader(File directory, Offset.ReadOffset readOffset, Parser<MESSAGE_TYPE> parser,
CallBack<MESSAGE_TYPE> callBack) {
CallBack<MESSAGE_TYPE> callBack) {
this.directory = directory;
this.readOffset = readOffset;
this.parser = parser;
......@@ -62,7 +68,8 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
preRead();
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::read, t -> logger.error("Buffer data pre read failure.", t)), 3, 1, TimeUnit.SECONDS);
.scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::read, t -> log.error(
"Buffer data pre read failure.", t)), 3, 1, TimeUnit.SECONDS);
}
private void preRead() {
......@@ -76,7 +83,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
try {
inputStream.skip(readOffset.getOffset());
} catch (IOException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
} else {
openInputStream(readEarliestDataFile());
......@@ -93,7 +100,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
inputStream = new FileInputStream(readingFile);
} catch (IOException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
......@@ -111,8 +118,8 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
}
private void read() {
if (logger.isDebugEnabled()) {
logger.debug("Read buffer data");
if (log.isDebugEnabled()) {
log.debug("Read buffer data");
}
try {
......@@ -137,8 +144,8 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
bufferDataCollection.add(bufferData);
}
if (logger.isDebugEnabled()) {
logger.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize);
if (log.isDebugEnabled()) {
log.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize);
}
} else if (bufferDataCollection.size() > 0) {
reCall();
......@@ -146,7 +153,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
}
......@@ -155,7 +162,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
reCall();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
......@@ -175,7 +182,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
} else {
break;
......@@ -183,6 +190,11 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
}
}
/**
* Callback when reader fetched data from the local file.
*
* @param <MESSAGE_TYPE> type of data in the cache file.
*/
public interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
boolean call(BufferData<MESSAGE_TYPE> bufferData);
}
......
......@@ -23,15 +23,18 @@ import com.google.protobuf.GeneratedMessageV3;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.skywalking.apm.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DataStreamReader represents the writer of the local file based cache provided by {@link DataStream}. It writes the
* given messages into the local files, and create new if necessary or file max size reached.
*
* @param <MESSAGE_TYPE> type of data in the cache file.
*/
@Slf4j
class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
private static final Logger logger = LoggerFactory.getLogger(DataStreamWriter.class);
private final File directory;
private final Offset.WriteOffset writeOffset;
......@@ -71,9 +74,9 @@ class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
boolean created = writingFile.createNewFile();
if (!created) {
logger.info("The file named {} already exists.", writingFile.getAbsolutePath());
log.info("The file named {} already exists.", writingFile.getAbsolutePath());
} else {
logger.info("Create a new buffer data file: {}", writingFile.getAbsolutePath());
log.info("Create a new buffer data file: {}", writingFile.getAbsolutePath());
}
writeOffset.setOffset(0);
......@@ -93,7 +96,7 @@ class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
outputStream = FileUtils.openOutputStream(writingFile, true);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
}
}
}
......@@ -22,6 +22,9 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.apm.util.StringUtil;
/**
* File content offset definition. Offset is the pointer when read/write the local file.
*/
class Offset {
private static final String SPLIT_CHARACTER = ",";
......@@ -35,11 +38,20 @@ class Offset {
readOffset = new ReadOffset(writeOffset);
}
/**
* @return the offset data into a single literal string for the persistence.
*/
String serialize() {
return readOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(readOffset.getOffset()) + SPLIT_CHARACTER + writeOffset
return readOffset.getFileName() + SPLIT_CHARACTER + String.valueOf(
readOffset.getOffset()) + SPLIT_CHARACTER + writeOffset
.getFileName() + SPLIT_CHARACTER + String.valueOf(writeOffset.getOffset());
}
/**
* Initialize the Offset object by given value.
*
* @param value serialized Offset
*/
void deserialize(String value) {
if (!StringUtil.isEmpty(value)) {
String[] values = value.split(SPLIT_CHARACTER);
......
......@@ -34,6 +34,10 @@ import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* OffsetStream is driven by the internal timer. Flush the hold read and write offset into the file. And restore the
* data from the same file in the initialization process.o
*/
class OffsetStream {
private static final Logger logger = LoggerFactory.getLogger(OffsetStream.class);
......@@ -79,7 +83,9 @@ class OffsetStream {
initialized = true;
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this::flush, t -> logger.error("Flush offset file in background failure.", t)), 2, 1, TimeUnit.SECONDS);
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::flush, t -> logger.error(
"Flush offset file in background failure.", t)), 2, 1, TimeUnit.SECONDS);
}
}
......@@ -122,7 +128,8 @@ class OffsetStream {
}
private String readLastLine() throws IOException {
ReversedLinesFileReader reader = new ReversedLinesFileReader(offsetFile, Charset.forName(BufferFileUtils.CHARSET));
ReversedLinesFileReader reader = new ReversedLinesFileReader(
offsetFile, Charset.forName(BufferFileUtils.CHARSET));
return reader.readLine();
}
}
......@@ -20,6 +20,10 @@ package org.apache.skywalking.oap.server.library.util;
import lombok.Getter;
/**
* GRPCStreamStatus is used for gRPC streaming client. It helps to make sure the gRPC client to wait the last streaming
* has the onCompleted or onError confirmation.
*/
@Getter
public class GRPCStreamStatus {
......
......@@ -26,9 +26,15 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Setter
public class SharingServerConfig extends ModuleConfig {
private String restHost;
/**
* Only setting the real port(not 0) makes the jetty server online.
*/
private int restPort;
private String restContextPath;
private String gRPCHost;
/**
* Only setting the real port(not 0) makes the gRPC server online.
*/
private int gRPCPort;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
......
......@@ -22,6 +22,16 @@ import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* Sharing server is an independent gRPC and Jetty servers provided for all receiver modules. In default, this module
* would not be activated unless the user active explicitly. It only delegates the core gRPC and Jetty servers.
*
* Once it is activated, provides separated servers, then all receivers use these to accept outside requests. Typical,
* this is activated to avoid the ip, port and thread pool sharing between receiver and internal traffics. For security
* consideration, receiver should open TLS and token check, and internal(remote module) traffic should base on trusted
* network, no TLS and token check. Even some companies may require TLS internally, it still use different TLS keys. In
* this specific case, we recommend users to consider use {@link org.apache.skywalking.oap.server.core.CoreModuleConfig.Role}.
*/
public class SharingServerModule extends ModuleDefine {
public static final String NAME = "receiver-sharing-server";
......
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* Service of trace segment parser.
*/
public interface ISegmentParserService extends Service {
void send(UpstreamSegment segment);
}
......@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.ProtocolVersion;
import org.apache.skywalking.apm.network.language.agent.SpanType;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
......@@ -53,16 +54,24 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SegmentParseV2 is a replication of SegmentParse, but be compatible with v2 trace protocol.
* SegmentParseV2 replaced the SegmentParse(V1 is before 6.0.0) to drive the segment analysis. It includes the following
* steps
*
* 1. Register data, name to ID register
*
* 2. If register unfinished, cache in the local buffer file. And back to (1).
*
* 3. If register finished, traverse the span and analysis by the given {@link SpanListener}s.
*
* 4. Notify the build event to all {@link SpanListener}s in order to forward all built sources into dispatchers.
*
* @since 6.0.0 In the 6.x, the V1 and V2 analysis both exist.
* @since 7.0.0 SegmentParse(V1) has been removed permanently.
*/
@Slf4j
public class SegmentParseV2 {
private static final Logger logger = LoggerFactory.getLogger(SegmentParseV2.class);
private final ModuleManager moduleManager;
private final List<SpanListener> spanListeners;
private final SegmentParserListenerManager listenerManager;
......@@ -76,7 +85,7 @@ public class SegmentParseV2 {
private volatile static CounterMetrics TRACE_PARSE_ERROR;
private SegmentParseV2(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
TraceServiceModuleConfig config) {
TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.spanListeners = new LinkedList<>();
......@@ -90,9 +99,19 @@ public class SegmentParseV2 {
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
TRACE_BUFFER_FILE_RETRY = metricsCreator.createCounter("v6_trace_buffer_file_retry", "The number of retry trace segment from the buffer file, but haven't registered successfully.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
TRACE_BUFFER_FILE_OUT = metricsCreator.createCounter("v6_trace_buffer_file_out", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
TRACE_PARSE_ERROR = metricsCreator.createCounter("v6_trace_parse_error", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
TRACE_BUFFER_FILE_RETRY = metricsCreator.createCounter(
"v6_trace_buffer_file_retry",
"The number of retry trace segment from the buffer file, but haven't registered successfully.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
TRACE_BUFFER_FILE_OUT = metricsCreator.createCounter(
"v6_trace_buffer_file_out", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);
TRACE_PARSE_ERROR = metricsCreator.createCounter(
"v6_trace_parse_error", "The number of trace segment out of the buffer file", MetricsTag.EMPTY_KEY,
MetricsTag.EMPTY_VALUE
);
}
this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME)
......@@ -116,15 +135,19 @@ public class SegmentParseV2 {
// Recheck in case that the segment comes from file buffer
final int serviceInstanceId = segmentObject.getServiceInstanceId();
if (serviceInstanceInventoryCache.get(serviceInstanceId) == null) {
logger.warn("Cannot recognize service instance id [{}] from cache, segment will be ignored", serviceInstanceId);
log.warn(
"Cannot recognize service instance id [{}] from cache, segment will be ignored", serviceInstanceId);
return true; // to mark it "completed" thus won't be retried
}
SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject);
if (!preBuild(traceIds, segmentDecorator)) {
if (logger.isDebugEnabled()) {
logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId());
if (log.isDebugEnabled()) {
log.debug(
"This segment id exchange not success, write to buffer file, id: {}",
segmentCoreInfo.getSegmentId()
);
}
if (source.equals(SegmentSource.Agent)) {
......@@ -135,8 +158,8 @@ public class SegmentParseV2 {
}
return false;
} else {
if (logger.isDebugEnabled()) {
logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
if (log.isDebugEnabled()) {
log.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
}
notifyListenerToBuild();
......@@ -144,7 +167,7 @@ public class SegmentParseV2 {
}
} catch (Throwable e) {
TRACE_PARSE_ERROR.inc();
logger.error(e.getMessage(), e);
log.error(e.getMessage(), e);
return true;
}
}
......@@ -213,8 +236,8 @@ public class SegmentParseV2 {
} else if (SpanType.Local.equals(spanDecorator.getSpanType())) {
notifyLocalListener(spanDecorator);
} else {
logger.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType()
.name());
log.error("span type value was unexpected, span type name: {}", spanDecorator.getSpanType()
.name());
}
}
}
......@@ -223,8 +246,8 @@ public class SegmentParseV2 {
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
if (logger.isDebugEnabled()) {
logger.debug("push to segment buffer write worker, id: {}", id);
if (log.isDebugEnabled()) {
log.debug("push to segment buffer write worker, id: {}", id);
}
SegmentStandardization standardization = new SegmentStandardization(id);
......@@ -279,7 +302,8 @@ public class SegmentParseV2 {
private void createSpanListeners() {
listenerManager.getSpanListenerFactories()
.forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
.forEach(
spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager, config)));
}
public static class Producer implements DataStreamReader.CallBack<UpstreamSegment> {
......@@ -291,7 +315,7 @@ public class SegmentParseV2 {
private final TraceServiceModuleConfig config;
public Producer(ModuleManager moduleManager, SegmentParserListenerManager listenerManager,
TraceServiceModuleConfig config) {
TraceServiceModuleConfig config) {
this.moduleManager = moduleManager;
this.listenerManager = listenerManager;
this.config = config;
......
......@@ -20,6 +20,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
/**
* The open service to the receivers. Segment parser for v2 trace protocol.
*/
public class SegmentParserServiceImpl implements ISegmentParserService {
private final SegmentParseV2.Producer segmentProducer;
......
......@@ -18,6 +18,16 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
/**
* The segment source.
*/
public enum SegmentSource {
Agent, Buffer
/**
* From the client side agent.
*/
Agent,
/**
* From the buffer file, because the last time register has not be successful.
*/
Buffer
}
......@@ -26,6 +26,10 @@ import org.apache.skywalking.apm.network.language.agent.v2.SpanObjectV2;
import static java.util.Objects.isNull;
/**
* SpanDecorator is used in the metadata register process, and provides an easy access way consistently, no matter
* before or after the register.
*/
public class SpanDecorator implements StandardBuilder {
private boolean isOrigin = true;
private final StandardBuilder standardBuilder;
......
......@@ -18,6 +18,12 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
/**
* The implementation supports immutable data format to mutable builder transformation.
*/
public interface StandardBuilder {
/**
* Change the status to be mutable if it hasn't. Keep in the mutable status when this is called multiple times.
*/
void toBuilder();
}
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
/**
* SpanListener for Entry span.
*/
public interface EntrySpanListener extends SpanListener {
void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
\ No newline at end of file
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
/**
* SpanListener for exit span.
*/
public interface ExitSpanListener extends SpanListener {
void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
/**
* SpanListener for the first span in the segment. The first span means span id is 0.
*/
public interface FirstSpanListener extends SpanListener {
void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
......@@ -21,6 +21,10 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
/**
* GlobalTraceIdsListener is the first notified listener in the trace analysis. The notifications include the trace is
* with other segment core info.
*/
public interface GlobalTraceIdsListener extends SpanListener {
void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo);
}
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
/**
* SpanListener for local span
*/
public interface LocalSpanListener extends SpanListener {
void parseLocal(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo);
}
......@@ -18,11 +18,24 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
/**
* SpanListener represents the callback when OAP does the trace segment analysis.
*/
public interface SpanListener {
/**
* The last step of the analysis process. Typically, the implementations forward the analysis results to the source
* receiver.
*/
void build();
/**
* @return true, if the given point matches the implementation.
*/
boolean containsPoint(Point point);
/**
* Analysis point when the analysis core traverses the segment
*/
enum Point {
Entry, Exit, Local, First, TraceIds
}
......
......@@ -21,6 +21,10 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
/**
* SpanListenerFactory implementation creates the listener instances when required. Every SpanListener could have its
* own creation factory.
*/
public interface SpanListenerFactory {
SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config);
}
......@@ -54,7 +54,9 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.
import static java.util.Objects.nonNull;
/**
* Notice, in here, there are following concepts match
* MultiScopesSpanListener includes the most segment to source(s) logic.
*
* This listener traverses the whole segment.
*/
public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListener, GlobalTraceIdsListener {
......@@ -98,6 +100,16 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.TraceIds.equals(point);
}
/**
* All entry spans are transferred as the Service, Instance and Endpoint related sources. Entry spans are treated on
* the behalf of the observability status of the service reported these spans.
*
* Also, when face the MQ and uninstrumented Gateways, there is different logic to generate the relationship between
* services/instances rather than the normal RPC direct call. The reason is the same, we aren't expecting the agent
* installed in the MQ server, and Gateway may not have suitable agent. Any uninstrumented service if they have the
* capability to forward SkyWalking header through themselves, you could consider the uninstrumented configurations
* to make the topology works to be a whole.
*/
@Override
public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket();
......@@ -118,7 +130,8 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ) || config.getUninstrumentedGatewaysConfig()
.isAddressConfiguredAsGateway(address)) {
int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(serviceIdByPeerId, networkAddressId);
int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(
serviceIdByPeerId, networkAddressId);
sourceBuilder.setSourceServiceInstanceId(instanceIdByPeerId);
sourceBuilder.setSourceServiceId(serviceIdByPeerId);
} else {
......@@ -151,6 +164,10 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
this.entrySpanDecorator = spanDecorator;
}
/**
* The exit span should be transferred to the service, instance and relationships from the client side detect
* point.
*/
@Override
public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
if (this.minuteTimeBucket == 0) {
......@@ -239,8 +256,9 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
}
sourceBuilder.setSourceServiceName(serviceInventoryCache.get(sourceBuilder.getSourceServiceId()).getName());
sourceBuilder.setSourceServiceInstanceName(instanceInventoryCache.get(sourceBuilder.getSourceServiceInstanceId())
.getName());
sourceBuilder.setSourceServiceInstanceName(
instanceInventoryCache.get(sourceBuilder.getSourceServiceInstanceId())
.getName());
if (sourceBuilder.getSourceEndpointId() != Const.NONE) {
sourceBuilder.setSourceEndpointName(endpointInventoryCache.get(sourceBuilder.getSourceEndpointId())
.getName());
......
......@@ -19,15 +19,16 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.UniqueId;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.source.Segment;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
......@@ -36,13 +37,12 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SegmentSpanListener forwards the segment raw data to the persistence layer with the query required conditions.
*/
@Slf4j
public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener, GlobalTraceIdsListener {
private static final Logger logger = LoggerFactory.getLogger(SegmentSpanListener.class);
private final SourceReceiver sourceReceiver;
private final TraceSegmentSampler sampler;
private final Segment segment = new Segment();
......@@ -113,8 +113,8 @@ public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener
@Override
public void build() {
if (logger.isDebugEnabled()) {
logger.debug("segment listener build, segment id: {}", segment.getSegmentId());
if (log.isDebugEnabled()) {
log.debug("segment listener build, segment id: {}", segment.getSegmentId());
}
if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
......
......@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
......@@ -35,16 +38,15 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Service Instance mapping basically is as same as the service mapping. The network address fetched from the propagated
* context is the alias for the specific service instance. This is just more detailed mapping setup.
*
* Read {@link ServiceMappingSpanListener}.
*/
@Slf4j
public class ServiceInstanceMappingSpanListener implements EntrySpanListener {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceMappingSpanListener.class);
private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
private final TraceServiceModuleConfig config;
private final ServiceInventoryCache serviceInventoryCache;
......@@ -71,21 +73,26 @@ public class ServiceInstanceMappingSpanListener implements EntrySpanListener {
@Override
public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
if (logger.isDebugEnabled()) {
logger.debug("service instance mapping listener parse reference");
if (log.isDebugEnabled()) {
log.debug("service instance mapping listener parse reference");
}
if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
int networkAddressId = spanDecorator.getRefs(i).getNetworkAddressId();
String address = networkAddressInventoryCache.get(networkAddressId).getName();
int serviceInstanceId = serviceInstanceInventoryCache.getServiceInstanceId(serviceInventoryCache.getServiceId(networkAddressId), networkAddressId);
int serviceInstanceId = serviceInstanceInventoryCache.getServiceInstanceId(
serviceInventoryCache.getServiceId(networkAddressId), networkAddressId);
if (config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) {
if (logger.isDebugEnabled()) {
logger.debug("{} is configured as gateway, will reset its mapping service instance id", serviceInstanceId);
if (log.isDebugEnabled()) {
log.debug(
"{} is configured as gateway, will reset its mapping service instance id",
serviceInstanceId
);
}
ServiceInstanceInventory instanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
ServiceInstanceInventory instanceInventory = serviceInstanceInventoryCache.get(
serviceInstanceId);
if (instanceInventory.getMappingServiceInstanceId() != Const.NONE && !serviceInstancesToResetMapping
.contains(serviceInstanceId)) {
serviceInstancesToResetMapping.add(serviceInstanceId);
......@@ -104,15 +111,17 @@ public class ServiceInstanceMappingSpanListener implements EntrySpanListener {
@Override
public void build() {
serviceInstanceMappings.forEach(instanceMapping -> {
if (logger.isDebugEnabled()) {
logger.debug("service instance mapping listener build, service id: {}, mapping service id: {}", instanceMapping
.getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId());
if (log.isDebugEnabled()) {
log.debug(
"service instance mapping listener build, service id: {}, mapping service id: {}", instanceMapping
.getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId());
}
serviceInstanceInventoryRegister.updateMapping(instanceMapping.getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId());
serviceInstanceInventoryRegister.updateMapping(
instanceMapping.getServiceInstanceId(), instanceMapping.getMappingServiceInstanceId());
});
serviceInstancesToResetMapping.forEach(instanceId -> {
if (logger.isDebugEnabled()) {
logger.debug("service instance mapping listener build, reset mapping of service id: {}", instanceId);
if (log.isDebugEnabled()) {
log.debug("service instance mapping listener build, reset mapping of service id: {}", instanceId);
}
serviceInstanceInventoryRegister.resetMapping(instanceId);
});
......
......@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
......@@ -34,16 +37,13 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* ServiceMappingSpanListener includes the specific logic about the concept of service mapping. Service mapping is the
* core idea to make the SkyWalking has good performance and low memory costs, when discovery the big topology.
*/
@Slf4j
public class ServiceMappingSpanListener implements EntrySpanListener {
private static final Logger logger = LoggerFactory.getLogger(ServiceMappingSpanListener.class);
private final IServiceInventoryRegister serviceInventoryRegister;
private final TraceServiceModuleConfig config;
private final ServiceInventoryCache serviceInventoryCache;
......@@ -69,10 +69,15 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
return Point.Entry.equals(point);
}
/**
* Fetch the network address information used at the client side from the propagated context(headers mostly. Besides
* the MQ and uninstrumented services, the the network address will be treated as the alias name of the current
* service. The alias mechanism is the service mapping.
*/
@Override
public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
if (logger.isDebugEnabled()) {
logger.debug("service mapping listener parse reference");
if (log.isDebugEnabled()) {
log.debug("service mapping listener parse reference");
}
if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
......@@ -83,11 +88,12 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
int serviceId = serviceInventoryCache.getServiceId(networkAddressId);
if (config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) {
if (logger.isDebugEnabled()) {
logger.debug("{} is configured as gateway, will reset its mapping service id", serviceId);
if (log.isDebugEnabled()) {
log.debug("{} is configured as gateway, will reset its mapping service id", serviceId);
}
ServiceInventory serviceInventory = serviceInventoryCache.get(serviceId);
if (serviceInventory.getMappingServiceId() != Const.NONE && !servicesToResetMapping.contains(serviceId)) {
if (serviceInventory.getMappingServiceId() != Const.NONE && !servicesToResetMapping.contains(
serviceId)) {
servicesToResetMapping.add(serviceId);
}
} else {
......@@ -104,15 +110,18 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
@Override
public void build() {
serviceMappings.forEach(serviceMapping -> {
if (logger.isDebugEnabled()) {
logger.debug("service mapping listener build, service id: {}, mapping service id: {}", serviceMapping.getServiceId(), serviceMapping
.getMappingServiceId());
if (log.isDebugEnabled()) {
log.debug(
"service mapping listener build, service id: {}, mapping service id: {}",
serviceMapping.getServiceId(), serviceMapping
.getMappingServiceId()
);
}
serviceInventoryRegister.updateMapping(serviceMapping.getServiceId(), serviceMapping.getMappingServiceId());
});
servicesToResetMapping.forEach(serviceId -> {
if (logger.isDebugEnabled()) {
logger.debug("service mapping listener build, reset mapping of service id: {}", serviceId);
if (log.isDebugEnabled()) {
log.debug("service mapping listener build, reset mapping of service id: {}", serviceId);
}
serviceInventoryRegister.resetMapping(serviceId);
});
......
......@@ -20,6 +20,20 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standard
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.StandardBuilder;
/**
* The implementation has details to do String to ID(integer) transformation.
*/
public interface IdExchanger<T extends StandardBuilder> {
/**
* Register all required fields in the builder to get the assigned IDs.
*
* @param standardBuilder object includes unregistered data.
* @param serviceId service id of this builder.
* @return true if all register completed. NOTICE, because the register is in async mode, mostly because this is a
* distributed register mechanism, check {@link org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor},
* the required ID could be unreachable as the register is still in processing. But in the production environment,
* besides the moments of the SkyWalking just being setup or new service/instance/endpoint online, all the registers
* should have finished back to when they are accessed at the first time. This register could process very fast.
*/
boolean exchange(T standardBuilder, int serviceId);
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
......@@ -27,13 +28,16 @@ import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInv
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Register the information inside the segment reference. All of them are downstream(caller) service information.
* Reference could include multiple rows, as this span could have multiple downstream, such as batch process, typically
* MQ consumer.
*
* Check the Cross Process Propagation Headers Protocol v2 for the details in the references.
*/
@Slf4j
public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
private static final Logger logger = LoggerFactory.getLogger(ReferenceIdExchanger.class);
private static ReferenceIdExchanger EXCHANGER;
private final IEndpointInventoryRegister endpointInventoryRegister;
private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
......@@ -63,14 +67,18 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
boolean exchanged = true;
if (standardBuilder.getEntryEndpointId() == 0) {
String entryEndpointName = Strings.isNullOrEmpty(standardBuilder.getEntryEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder
String entryEndpointName = Strings.isNullOrEmpty(
standardBuilder.getEntryEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder
.getEntryEndpointName();
int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId())
.getServiceId();
int entryEndpointId = getEndpointId(entryServiceId, entryEndpointName);
if (entryEndpointId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("entry endpoint name: {} from service id: {} exchange failed", entryEndpointName, entryServiceId);
if (log.isDebugEnabled()) {
log.debug(
"entry endpoint name: {} from service id: {} exchange failed", entryEndpointName,
entryServiceId
);
}
exchanged = false;
......@@ -86,15 +94,19 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
if (standardBuilder.getParentEndpointId() == 0) {
String parentEndpointName = Strings.isNullOrEmpty(standardBuilder.getParentEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder
String parentEndpointName = Strings.isNullOrEmpty(
standardBuilder.getParentEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder
.getParentEndpointName();
int parentServiceId = serviceInstanceInventoryCache.get(standardBuilder.getParentServiceInstanceId())
.getServiceId();
int parentEndpointId = getEndpointId(parentServiceId, parentEndpointName);
if (parentEndpointId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("parent endpoint name: {} from service id: {} exchange failed", parentEndpointName, parentServiceId);
if (log.isDebugEnabled()) {
log.debug(
"parent endpoint name: {} from service id: {} exchange failed", parentEndpointName,
parentServiceId
);
}
exchanged = false;
......@@ -110,11 +122,15 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
if (standardBuilder.getNetworkAddressId() == 0 && !Strings.isNullOrEmpty(standardBuilder.getNetworkAddress())) {
int networkAddressId = networkAddressInventoryRegister.getOrCreate(standardBuilder.getNetworkAddress(), null);
int networkAddressId = networkAddressInventoryRegister.getOrCreate(
standardBuilder.getNetworkAddress(), null);
if (networkAddressId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("network getAddress: {} from service id: {} exchange failed", standardBuilder.getNetworkAddress(), serviceId);
if (log.isDebugEnabled()) {
log.debug(
"network getAddress: {} from service id: {} exchange failed",
standardBuilder.getNetworkAddress(), serviceId
);
}
exchanged = false;
......
......@@ -20,6 +20,8 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standard
import com.google.common.base.Strings;
import com.google.gson.JsonObject;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.SpanType;
......@@ -38,15 +40,15 @@ import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryR
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* SpanExchanger process the segment owner(service/instance) ID register, including operation name and network address.
*
* @since 6.6.0 only the operation name of entry span is registered as the endpoint, others keep the operation name as
* the literal string.
*/
@Slf4j
public class SpanExchanger implements IdExchanger<SpanDecorator> {
private static final Logger logger = LoggerFactory.getLogger(SpanExchanger.class);
private static SpanExchanger EXCHANGER;
private final ServiceInventoryCache serviceInventoryCacheDAO;
private final IServiceInventoryRegister serviceInventoryRegister;
......@@ -95,8 +97,9 @@ public class SpanExchanger implements IdExchanger<SpanDecorator> {
int componentId = componentLibraryCatalogService.getComponentId(standardBuilder.getComponent());
if (componentId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("component: {} in service: {} exchange failed", standardBuilder.getComponent(), serviceId);
if (log.isDebugEnabled()) {
log.debug(
"component: {} in service: {} exchange failed", standardBuilder.getComponent(), serviceId);
}
exchanged = false;
......@@ -109,11 +112,12 @@ public class SpanExchanger implements IdExchanger<SpanDecorator> {
int peerId = standardBuilder.getPeerId();
if (peerId == 0 && !Strings.isNullOrEmpty(standardBuilder.getPeer())) {
peerId = networkAddressInventoryRegister.getOrCreate(standardBuilder.getPeer(), buildServiceProperties(standardBuilder));
peerId = networkAddressInventoryRegister.getOrCreate(
standardBuilder.getPeer(), buildServiceProperties(standardBuilder));
if (peerId == Const.NONE) {
if (logger.isDebugEnabled()) {
logger.debug("peer: {} in service: {} exchange failed", standardBuilder.getPeer(), serviceId);
if (log.isDebugEnabled()) {
log.debug("peer: {} in service: {} exchange failed", standardBuilder.getPeer(), serviceId);
}
exchanged = false;
......@@ -135,7 +139,8 @@ public class SpanExchanger implements IdExchanger<SpanDecorator> {
* it will only be updated at the first time for now.
*/
JsonObject properties = null;
ServiceInventory newServiceInventory = serviceInventoryCacheDAO.get(serviceInventoryCacheDAO.getServiceId(peerId));
ServiceInventory newServiceInventory = serviceInventoryCacheDAO.get(
serviceInventoryCacheDAO.getServiceId(peerId));
if (SpanLayer.Database.equals(standardBuilder.getSpanLayer())) {
if (!newServiceInventory.hasProperties()) {
properties = buildServiceProperties(standardBuilder);
......@@ -143,8 +148,9 @@ public class SpanExchanger implements IdExchanger<SpanDecorator> {
}
serviceInventoryRegister.update(newServiceInventory.getSequence(), nodeType, properties);
ServiceInstanceInventory newServiceInstanceInventory = serviceInstanceInventoryCacheDAO.get(serviceInstanceInventoryCacheDAO
.getServiceInstanceId(newServiceInventory.getSequence(), peerId));
ServiceInstanceInventory newServiceInstanceInventory = serviceInstanceInventoryCacheDAO.get(
serviceInstanceInventoryCacheDAO
.getServiceInstanceId(newServiceInventory.getSequence(), peerId));
serviceInstanceInventoryRegister.update(newServiceInstanceInventory.getSequence(), nodeType, properties);
}
......@@ -154,14 +160,16 @@ public class SpanExchanger implements IdExchanger<SpanDecorator> {
* so, since 6.6.0, only it triggers register.
*/
if (SpanType.Entry.equals(standardBuilder.getSpanType())) {
String endpointName = Strings.isNullOrEmpty(standardBuilder.getOperationName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder
String endpointName = Strings.isNullOrEmpty(
standardBuilder.getOperationName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder
.getOperationName();
int endpointId = endpointInventoryRegister.getOrCreate(serviceId, endpointName, DetectPoint.fromSpanType(standardBuilder
.getSpanType()));
int endpointId = endpointInventoryRegister.getOrCreate(
serviceId, endpointName, DetectPoint.fromSpanType(standardBuilder
.getSpanType()));
if (endpointId == 0) {
if (logger.isDebugEnabled()) {
logger.debug("endpoint name: {} from service id: {} exchange failed", endpointName, serviceId);
if (log.isDebugEnabled()) {
log.debug("endpoint name: {} from service id: {} exchange failed", endpointName, serviceId);
}
exchanged = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册