提交 54614a05 编写于 作者: K kezhenxu94 提交者: wu-sheng

Support gateway without agent (#3308)

* Support Gateway without agent
上级 b3a2c647
......@@ -27,6 +27,10 @@ pipeline {
skipStagesAfterUnstable()
}
environment {
MAVEN_OPTS = '-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx3g'
}
stages {
stage('Install & Test') {
parallel {
......
......@@ -26,7 +26,7 @@ pipeline {
}
environment {
MAVEN_OPTS = '-XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx3g'
MAVEN_OPTS = '-Dmaven.repo.local=.m2/repository -XX:+TieredCompilation -XX:TieredStopAtLevel=1 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit -Xmx3g'
}
stages {
......@@ -55,7 +55,7 @@ pipeline {
stage('Compile Test Codes') {
steps {
sh './mvnw -f test/e2e/pom.xml clean'
sh './mvnw -f test/e2e/pom.xml -pl e2e-base clean install'
}
}
......
......@@ -23,13 +23,11 @@ import com.google.common.collect.Lists;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.Constants;
import org.hamcrest.core.StringContains;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.PrintStream;
import java.util.List;
import static org.mockito.Matchers.anyString;
......@@ -42,28 +40,20 @@ public class PatternLoggerTest {
public static final String PATTERN = "%timestamp+0800 %level [%agent_name,,,] [%thread] %class:-1 %msg %throwable";
private static PrintStream OUT_REF;
private static PrintStream ERR_REF;
@BeforeClass
public static void initAndHoldOut() {
OUT_REF = System.out;
ERR_REF = System.err;
Config.Agent.SERVICE_NAME = "testAppFromConfig";
}
@Test
public void testLog() {
PrintStream output = Mockito.mock(PrintStream.class);
System.setOut(output);
PrintStream err = Mockito.mock(PrintStream.class);
System.setErr(err);
final IWriter output = Mockito.mock(IWriter.class);
PatternLogger logger = new PatternLogger(PatternLoggerTest.class, PATTERN) {
@Override
protected void logger(LogLevel level, String message, Throwable e) {
String r = format(level, message, e);
SystemOutWriter.INSTANCE.write(r);
output.write(r);
}
};
......@@ -84,19 +74,16 @@ public class PatternLoggerTest {
logger.error(new NullPointerException(), "hello {}", "world");
Mockito.verify(output, times(9))
.println(anyString());
.write(anyString());
}
@Test
public void testLogWithSpecialChar() {
PrintStream output = Mockito.mock(PrintStream.class);
System.setOut(output);
PrintStream err = Mockito.mock(PrintStream.class);
System.setErr(err);
final IWriter output = Mockito.mock(IWriter.class);
PatternLogger logger = new PatternLogger(PatternLoggerTest.class, PATTERN) {
@Override
protected void logger(LogLevel level, String message, Throwable e) {
SystemOutWriter.INSTANCE.write(format(level, message, e));
output.write(format(level, message, e));
}
};
......@@ -117,7 +104,7 @@ public class PatternLoggerTest {
logger.error(new NullPointerException(), "hello {}", "&&&**%%");
Mockito.verify(output, times(9))
.println(anyString());
.write(anyString());
}
@Test
......@@ -156,10 +143,4 @@ public class PatternLoggerTest {
}
@AfterClass
public static void reset() {
System.setOut(OUT_REF);
System.setErr(ERR_REF);
}
}
......@@ -83,6 +83,8 @@ system.
set the expired time for each dimension.
1. [Dynamic Configuration](dynamic-config.md). Make configuration of OAP changed dynamic, from remote service
or 3rd party configuration management system.
1. [Uninstrumented Gateways](uninstrumented-gateways.md). Configure gateways/proxies that are not supported by SkyWalking agent plugins,
to reflect the delegation in topology graph.
## Telemetry for backend
OAP backend cluster itself underlying is a distributed streaming process system. For helping the Ops team,
......@@ -112,4 +114,4 @@ For example, metrics time will be formatted like YYYYMMDDHHmm in minute dimensio
which format process is timezone related.
In default, SkyWalking OAP backend choose the OS default timezone.
If you want to override it, please follow Java and OS documents to do so.
\ No newline at end of file
If you want to override it, please follow Java and OS documents to do so.
......@@ -7,6 +7,7 @@ Right now, SkyWalking supports following dynamic configurations.
| Config Key | Value Description | Value Format Example |
|:----:|:----:|:----:|
|receiver-trace.default.slowDBAccessThreshold| Thresholds of slow Database statement, override `receiver-trace/default/slowDBAccessThreshold` of `applciation.yml`. | default:200,mongodb:50|
|receiver-trace.default.uninstrumentedGateways| The uninstrumented gateways, override `gateways.yml`. | not set |
This feature depends on upstream service, so it is **OFF** as default.
......
# Uninstrumented Gateways/Proxies
The uninstrumented gateways are not instrumented by SkyWalking agent plugin when they are started,
but they can be configured in `gateways.yml` file or via [Dynamic Configuration](dynamic-config.md). The reason why they can't register
to backend automatically is that there're no suitable agent plugins, for example, there is no agent plugins for Nginx, haproxy, etc.
So in order to visualize the real topology, we provide a way to configure the gateways/proxies manually.
## Configuration Format
The configuration content includes the gateways' names and their instances:
```yml
gateways:
- name: proxy0 # the name is not used for now
instances:
- host: 127.0.0.1 # the host/ip of this gateway instance
port: 9099 # the port of this gateway instance, defaults to 80
```
**Note** that the `host` of the instance must be the one that is actually used in client side, for example,
if the instance `proxyA` has 2 IPs, say `192.168.1.110` and `192.168.1.111`, both of which delegates the target service,
and the client connects to `192.168.1.110`, then configuring `192.168.1.111` as the `host` won't work properly.
......@@ -58,6 +58,8 @@ public class ServiceInventory extends RegisterSource {
@Getter(AccessLevel.PRIVATE) @Column(columnName = PROPERTIES) private String prop = Const.EMPTY_JSON_OBJECT_STRING;
@Getter private JsonObject properties;
@Getter @Setter private boolean resetServiceMapping = false;
public NodeType getServiceNodeType() {
return NodeType.get(this.nodeType);
}
......@@ -119,6 +121,7 @@ public class ServiceInventory extends RegisterSource {
inventory.setAddressId(addressId);
inventory.setLastUpdateTime(getLastUpdateTime());
inventory.setMappingServiceId(mappingServiceId);
inventory.setResetServiceMapping(resetServiceMapping);
inventory.setProp(prop);
return inventory;
......@@ -150,6 +153,7 @@ public class ServiceInventory extends RegisterSource {
remoteBuilder.addDataIntegers(addressId);
remoteBuilder.addDataIntegers(mappingServiceId);
remoteBuilder.addDataIntegers(nodeType);
remoteBuilder.addDataIntegers(resetServiceMapping ? 1 : 0);
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
......@@ -166,6 +170,7 @@ public class ServiceInventory extends RegisterSource {
setAddressId(remoteData.getDataIntegers(2));
setMappingServiceId(remoteData.getDataIntegers(3));
setNodeType(remoteData.getDataIntegers(4));
setResetServiceMapping(remoteData.getDataIntegers(5) == 1);
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
......@@ -186,8 +191,11 @@ public class ServiceInventory extends RegisterSource {
if (serviceInventory.getLastUpdateTime() >= this.getLastUpdateTime()) {
this.nodeType = serviceInventory.getNodeType();
this.resetServiceMapping = serviceInventory.isResetServiceMapping();
setProp(serviceInventory.getProp());
if (Const.NONE != serviceInventory.getMappingServiceId()) {
if (serviceInventory.isResetServiceMapping()) {
this.mappingServiceId = Const.NONE;
} else if (Const.NONE != serviceInventory.getMappingServiceId()) {
this.mappingServiceId = serviceInventory.getMappingServiceId();
}
isChanged = true;
......
......@@ -36,4 +36,16 @@ public interface IServiceInventoryRegister extends Service {
void heartbeat(int serviceId, long heartBeatTime);
void updateMapping(int serviceId, int mappingServiceId);
/**
* Reset the {@link org.apache.skywalking.oap.server.core.register.ServiceInventory#mappingServiceId}
* of a given service id.
*
* There are cases when the mapping service id needs to be reset to {@code 0}, for example, when an
* uninstrumented gateway joins, the mapping service id of the services that are delegated by this gateway
* should be reset to {@code 0}, allowing the gateway to appear in the topology, see #3308 for more detail.
*
* @param serviceId id of the service whose mapping service id is to be reset
*/
void resetMapping(int serviceId);
}
......@@ -19,14 +19,18 @@
package org.apache.skywalking.oap.server.core.register.service;
import com.google.gson.JsonObject;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.register.NodeType;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import static java.util.Objects.isNull;
......@@ -132,6 +136,19 @@ public class ServiceInventoryRegister implements IServiceInventoryRegister {
}
}
@Override public void resetMapping(int serviceId) {
ServiceInventory serviceInventory = getServiceInventoryCache().get(serviceId);
if (Objects.nonNull(serviceInventory)) {
serviceInventory = serviceInventory.getClone();
serviceInventory.setLastUpdateTime(System.currentTimeMillis());
serviceInventory.setResetServiceMapping(true);
InventoryStreamProcessor.getInstance().in(serviceInventory);
} else {
logger.warn("Service {} mapping update, but not found in storage.", serviceId);
}
}
private boolean compare(ServiceInventory newServiceInventory, NodeType nodeType) {
if (Objects.nonNull(newServiceInventory)) {
return nodeType.equals(newServiceInventory.getServiceNodeType());
......
......@@ -87,7 +87,7 @@
</ports>
<wait>
<log>started</log>
<time>30000</time>
<time>60000</time>
</wait>
<env>
<discovery.type>single-node</discovery.type>
......
......@@ -18,23 +18,34 @@
package org.apache.skywalking.oap.server.receiver.trace.provider;
import java.io.IOException;
import org.apache.skywalking.oap.server.configuration.api.*;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.grpc.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v5.rest.TraceSegmentServletHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.v6.grpc.TraceSegmentReportServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParseV2;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserServiceImpl;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
......@@ -44,6 +55,7 @@ public class TraceModuleProvider extends ModuleProvider {
private SegmentParse.Producer segmentProducer;
private SegmentParseV2.Producer segmentProducerV2;
private DBLatencyThresholdsAndWatcher thresholds;
private UninstrumentedGatewaysConfig uninstrumentedGatewaysConfig;
public TraceModuleProvider() {
this.moduleConfig = new TraceServiceModuleConfig();
......@@ -64,7 +76,10 @@ public class TraceModuleProvider extends ModuleProvider {
@Override public void prepare() throws ServiceNotProvidedException {
thresholds = new DBLatencyThresholdsAndWatcher(moduleConfig.getSlowDBAccessThreshold(), this);
uninstrumentedGatewaysConfig = new UninstrumentedGatewaysConfig(this);
moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);
moduleConfig.setUninstrumentedGatewaysConfig(uninstrumentedGatewaysConfig);
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager(), moduleConfig);
segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager(), moduleConfig);
......@@ -89,6 +104,7 @@ public class TraceModuleProvider extends ModuleProvider {
JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(JettyHandlerRegister.class);
try {
dynamicConfigurationService.registerConfigChangeWatcher(thresholds);
dynamicConfigurationService.registerConfigChangeWatcher(uninstrumentedGatewaysConfig);
grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer));
grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2, getManager()));
......
......@@ -39,6 +39,7 @@ public class TraceServiceModuleConfig extends ModuleConfig {
*/
@Setter @Getter private String slowDBAccessThreshold = "default:200";
@Setter @Getter private DBLatencyThresholdsAndWatcher dbLatencyThresholdsAndWatcher;
@Setter @Getter private UninstrumentedGatewaysConfig uninstrumentedGatewaysConfig;
/**
* Analysis trace status.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.yaml.snakeyaml.Yaml;
import java.io.FileNotFoundException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static java.util.Objects.isNull;
/**
* @author kezhenxu94
*/
@Slf4j
public class UninstrumentedGatewaysConfig extends ConfigChangeWatcher {
private final AtomicReference<String> settingsString;
private volatile Map<String, GatewayInstanceInfo> gatewayInstanceKeyedByAddress = Collections.emptyMap();
UninstrumentedGatewaysConfig(TraceModuleProvider provider) {
super(TraceModule.NAME, provider, "uninstrumentedGateways");
this.settingsString = new AtomicReference<>(Const.EMPTY_STRING);
final GatewayInfos defaultGateways = parseGatewaysFromFile("gateways.yml");
log.info("Default configured gateways: {}", defaultGateways);
onGatewaysUpdated(defaultGateways);
}
private void activeSetting(String config) {
if (log.isDebugEnabled()) {
log.debug("Updating using new static config: {}", config);
}
this.settingsString.set(config);
onGatewaysUpdated(parseGatewaysFromYml(config));
}
@Override
public void notify(ConfigChangeEvent value) {
if (EventType.DELETE.equals(value.getEventType())) {
activeSetting("");
} else {
activeSetting(value.getNewValue());
}
}
@Override
public String value() {
return settingsString.get();
}
private void onGatewaysUpdated(final GatewayInfos gateways) {
log.info("Updating uninstrumented gateways with: {}", gateways);
if (isNull(gateways)) {
gatewayInstanceKeyedByAddress = Collections.emptyMap();
} else {
gatewayInstanceKeyedByAddress =
StreamSupport.stream(gateways.spliterator(), false)
.flatMap(instance -> instance.getInstances().stream())
.collect(Collectors.toMap(GatewayInstanceInfo::getAddress, Function.identity()));
}
}
public boolean isAddressConfiguredAsGateway(final String address) {
final boolean isConfiguredAsGateway = gatewayInstanceKeyedByAddress.get(address) != null;
if (log.isDebugEnabled()) {
log.debug("Address [{}] is configured as gateway: {}", address, isConfiguredAsGateway);
}
return isConfiguredAsGateway;
}
private GatewayInfos parseGatewaysFromFile(final String file) {
try {
final Reader reader = ResourceUtils.read(file);
return new Yaml().loadAs(reader, GatewayInfos.class);
} catch (FileNotFoundException e) {
log.error("Cannot load gateways from: {}", file, e);
}
return GatewayInfos.EMPTY;
}
private GatewayInfos parseGatewaysFromYml(final String ymlContent) {
try {
return new Yaml().loadAs(ymlContent, GatewayInfos.class);
} catch (Exception e) {
log.error("Failed to parse yml content as gateways: \n{}", ymlContent, e);
}
return GatewayInfos.EMPTY;
}
@Getter
@Setter
@ToString
public static class GatewayInfo {
private String name;
private List<GatewayInstanceInfo> instances;
}
@ToString
public static class GatewayInfos implements Iterable<GatewayInfo> {
static final GatewayInfos EMPTY = new GatewayInfos();
@Getter
@Setter
private Collection<GatewayInfo> gateways;
GatewayInfos() {
gateways = new ArrayList<>();
}
@Override
public Iterator<GatewayInfo> iterator() {
return gateways.iterator();
}
}
@Getter
@Setter
@ToString
public static class GatewayInstanceInfo {
private String host;
private Integer port;
String getAddress() {
return getHost() + ":" + (isNull(getPort()) || getPort() <= 0 ? "80" : getPort());
}
}
}
......@@ -18,20 +18,31 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint;
import java.util.*;
import org.apache.skywalking.apm.network.common.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.receiver.trace.provider.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.DBLatencyThresholdsAndWatcher;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.ReferenceDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.GlobalTraceIdsListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import static java.util.Objects.nonNull;
......@@ -57,6 +68,7 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
private final List<SourceBuilder> exitSourceBuilders;
private final List<DatabaseSlowStatement> slowDatabaseAccesses;
private final TraceServiceModuleConfig config;
private final NetworkAddressInventoryCache networkAddressInventoryCache;
private SpanDecorator entrySpanDecorator;
private long minuteTimeBucket;
private String traceId;
......@@ -69,6 +81,7 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);
this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
this.config = config;
this.traceId = null;
}
......@@ -87,9 +100,12 @@ public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListe
SourceBuilder sourceBuilder = new SourceBuilder();
sourceBuilder.setSourceEndpointId(reference.getParentEndpointId());
if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
int serviceIdByPeerId = serviceInventoryCache.getServiceId(reference.getNetworkAddressId());
int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(serviceIdByPeerId, reference.getNetworkAddressId());
final int networkAddressId = reference.getNetworkAddressId();
final int serviceIdByPeerId = serviceInventoryCache.getServiceId(networkAddressId);
final String address = networkAddressInventoryCache.get(networkAddressId).getName();
if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ) || config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) {
int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(serviceIdByPeerId, networkAddressId);
sourceBuilder.setSourceServiceInstanceId(instanceIdByPeerId);
sourceBuilder.setSourceServiceId(serviceIdByPeerId);
} else {
......
......@@ -18,17 +18,25 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service;
import java.util.*;
import lombok.*;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SegmentCoreInfo;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.SpanDecorator;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntrySpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
/**
* @author peng-yongsheng
......@@ -38,12 +46,17 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
private static final Logger logger = LoggerFactory.getLogger(ServiceMappingSpanListener.class);
private final IServiceInventoryRegister serviceInventoryRegister;
private final TraceServiceModuleConfig config;
private final ServiceInventoryCache serviceInventoryCache;
private List<ServiceMapping> serviceMappings = new LinkedList<>();
private final NetworkAddressInventoryCache networkAddressInventoryCache;
private final List<ServiceMapping> serviceMappings = new LinkedList<>();
private final List<Integer> servicesToResetMapping = new LinkedList<>();
private ServiceMappingSpanListener(ModuleManager moduleManager) {
private ServiceMappingSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
this.config = config;
}
@Override public boolean containsPoint(Point point) {
......@@ -58,9 +71,16 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
int serviceId = serviceInventoryCache.getServiceId(spanDecorator.getRefs(i).getNetworkAddressId());
int mappingServiceId = serviceInventoryCache.get(serviceId).getMappingServiceId();
if (mappingServiceId != segmentCoreInfo.getServiceId()) {
int networkAddressId = spanDecorator.getRefs(i).getNetworkAddressId();
String address = networkAddressInventoryCache.get(networkAddressId).getName();
int serviceId = serviceInventoryCache.getServiceId(networkAddressId);
if (config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) {
if (logger.isDebugEnabled()) {
logger.debug("{} is configured as gateway, will reset its mapping service id", serviceId);
}
servicesToResetMapping.add(serviceId);
} else {
ServiceMapping serviceMapping = new ServiceMapping();
serviceMapping.setServiceId(serviceId);
serviceMapping.setMappingServiceId(segmentCoreInfo.getServiceId());
......@@ -78,18 +98,24 @@ public class ServiceMappingSpanListener implements EntrySpanListener {
}
serviceInventoryRegister.updateMapping(serviceMapping.getServiceId(), serviceMapping.getMappingServiceId());
});
servicesToResetMapping.forEach(serviceId -> {
if (logger.isDebugEnabled()) {
logger.debug("service mapping listener build, reset mapping of service id: {}", serviceId);
}
serviceInventoryRegister.resetMapping(serviceId);
});
}
public static class Factory implements SpanListenerFactory {
@Override public SpanListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
return new ServiceMappingSpanListener(moduleManager);
return new ServiceMappingSpanListener(moduleManager, config);
}
}
@Setter
@Getter
private class ServiceMapping {
private static class ServiceMapping {
private int serviceId;
private int mappingServiceId;
}
......
......@@ -78,6 +78,9 @@ public class SpringSleuthSegmentBuilderTest implements SegmentListener {
}
@Override public void resetMapping(final int serviceId) {
}
};
IServiceInstanceInventoryRegister instanceIDService = new IServiceInstanceInventoryRegister() {
......@@ -193,4 +196,4 @@ public class SpringSleuthSegmentBuilderTest implements SegmentListener {
Assert.assertEquals(-1, spanObject.getParentSpanId());
Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
}
}
\ No newline at end of file
}
文件模式从 100644 更改为 100755
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#gateways:
# - name: proxy0
# instances:
# - host: 127.0.0.1 # the host/ip of this gateway instance
# port: 9099 # the port of this gateway instance, defaults to 80
......@@ -424,6 +424,8 @@
<exclude>.mvn/wrapper/maven-wrapper.properties</exclude>
<exclude>tools/dependencies/known-oap-backend-dependencies.txt</exclude>
<exclude>apm-checkstyle/CHECKSTYLE_HEAD</exclude>
<exclude>.m2/**</exclude>
</excludes>
</configuration>
<executions>
......
......@@ -43,7 +43,7 @@ public class TestController {
public User createAuthor(@RequestBody final User user) throws InterruptedException {
Thread.sleep(1000L);
final ResponseEntity<User> response = restTemplate.postForEntity(
"http://localhost:9090/e2e/users", user, User.class
"http://127.0.0.1:9099/e2e/users", user, User.class
);
return response.getBody();
}
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>e2e-cluster</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>gateway</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-zuul</artifactId>
<version>${spring.cloud.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<executable>true</executable>
<addResources>true</addResources>
<excludeDevtools>true</excludeDevtools>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.e2e;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.zuul.EnableZuulProxy;
/**
* @author kezhenxu94
*/
@EnableZuulProxy
@SpringBootApplication
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
server:
port: 9099
spring:
application:
name: SkyWalking
zuul:
ignoredServices: '*'
routes:
api:
path: /e2e/users
url: http://127.0.0.1:9090
......@@ -33,6 +33,7 @@
<modules>
<module>provider</module>
<module>consumer</module>
<module>gateway</module>
<module>test-runner</module>
</modules>
</project>
......@@ -38,6 +38,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>gateway</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>provider</artifactId>
......@@ -53,8 +59,9 @@
</dependencies>
<properties>
<service0.name>provider</service0.name>
<service1.name>consumer</service1.name>
<provider.name>provider</provider.name>
<consumer.name>consumer</consumer.name>
<gateway.name>gateway</gateway.name>
<e2e.container.version>1.1</e2e.container.version>
<e2e.container.name.prefix>skywalking-e2e-container-${build.id}-cluster</e2e.container.name.prefix>
</properties>
......@@ -99,7 +106,7 @@
</ports>
<wait>
<log>binding to port</log>
<time>30000</time>
<time>120000</time>
</wait>
</run>
</image>
......@@ -117,22 +124,22 @@
</SW_CLUSTER_ZK_HOST_PORT>
<INSTRUMENTED_SERVICE_1>
${service0.name}-${project.version}.jar
${provider.name}-${project.version}.jar
</INSTRUMENTED_SERVICE_1>
<INSTRUMENTED_SERVICE_1_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800
-DSW_AGENT_NAME=${service0.name}
-DSW_AGENT_NAME=${provider.name}
</INSTRUMENTED_SERVICE_1_OPTS>
<INSTRUMENTED_SERVICE_1_ARGS>
--server.port=9090
</INSTRUMENTED_SERVICE_1_ARGS>
<INSTRUMENTED_SERVICE_2>
${service1.name}-${project.version}.jar
${consumer.name}-${project.version}.jar
</INSTRUMENTED_SERVICE_2>
<INSTRUMENTED_SERVICE_2_OPTS>
-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11801
-DSW_AGENT_NAME=${service1.name}
-DSW_AGENT_NAME=${consumer.name}
</INSTRUMENTED_SERVICE_2_OPTS>
<INSTRUMENTED_SERVICE_2_ARGS>
--server.port=9091
......@@ -156,10 +163,13 @@
../../../../dist-for-cluster/apache-skywalking-apm-bin:/sw
</volume>
<volume>
../${service0.name}/target/${service0.name}-${project.version}.jar:/home/${service0.name}-${project.version}.jar
../${gateway.name}/target/${gateway.name}-${project.version}.jar:/home/${gateway.name}-${project.version}.jar
</volume>
<volume>
../${provider.name}/target/${provider.name}-${project.version}.jar:/home/${provider.name}-${project.version}.jar
</volume>
<volume>
../${service1.name}/target/${service1.name}-${project.version}.jar:/home/${service1.name}-${project.version}.jar
../${consumer.name}/target/${consumer.name}-${project.version}.jar:/home/${consumer.name}-${project.version}.jar
</volume>
<volume>
${project.basedir}/src/docker/rc.d:/rc.d:ro
......@@ -171,7 +181,7 @@
</volumes>
<wait>
<log>SkyWalking e2e container is ready for tests</log>
<time>2400000</time>
<time>3000000</time>
</wait>
</run>
</image>
......
......@@ -22,7 +22,17 @@ if test "${MODE}" = "cluster"; then
# substitute application.yml to be capable of cluster mode
cd ${SW_HOME}/config \
&& awk -f /clusterize.awk application.yml > clusterized_app.yml \
&& mv clusterized_app.yml application.yml
&& mv clusterized_app.yml application.yml \
&& echo '
gateways:
- name: proxy0
instances:
- host: 127.0.0.1 # the host/ip of this gateway instance
port: 9099 # the port of this gateway instance, defaults to 80
' > gateways.yml \
&& sed '/<Loggers>/a<logger name="org.apache.skywalking.oap.server.receiver.trace.provider.UninstrumentedGatewaysConfig" level="DEBUG"/>\
\n<logger name="org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.ServiceMappingSpanListener" level="DEBUG"/>' log4j2.xml > log4j2debuggable.xml \
&& mv log4j2debuggable.xml log4j2.xml
cd ${SW_HOME}/webapp \
&& awk '/^\s+listOfServers:/ {gsub("listOfServers:.*", "listOfServers: 127.0.0.1:12800,127.0.0.1:12801", $0)} {print}' webapp.yml > clusterized_webapp.yml \
......
......@@ -16,12 +16,27 @@
#!/usr/bin/env bash
echo 'starting gateway service...' \
&& java -jar /home/gateway-1.0.0.jar 2>&1 > /tmp/gateway.log &
check_tcp 127.0.0.1 \
9099 \
60 \
10 \
'waiting for the gateway service to be ready'
if [[ $? -ne 0 ]]; then
echo "gateway service failed to start in 60 * 10 seconds: "
cat /tmp/gateway.log
exit 1
fi
echo 'starting OAP server...' \
&& SW_STORAGE_ES_BULK_ACTIONS=1 \
&& SW_STORAGE_ES_FLUSH_INTERVAL=1 \
&& SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer1 \
&& SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer1 \
&& start_oap 'init'
SW_STORAGE_ES_FLUSH_INTERVAL=1 \
SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer1 \
SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer1 \
start_oap 'init'
echo 'starting Web app...' \
&& start_webapp '0.0.0.0' 8081
......@@ -30,12 +45,12 @@ if test "${MODE}" = "cluster"; then
# start another OAP server in a different port
echo 'starting OAP server...' \
&& SW_CORE_GRPC_PORT=11801 \
&& SW_CORE_REST_PORT=12801 \
&& SW_STORAGE_ES_BULK_ACTIONS=1 \
&& SW_STORAGE_ES_FLUSH_INTERVAL=1 \
&& SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer2 \
&& SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer2 \
&& start_oap 'no-init'
SW_CORE_REST_PORT=12801 \
SW_STORAGE_ES_BULK_ACTIONS=1 \
SW_STORAGE_ES_FLUSH_INTERVAL=1 \
SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer2 \
SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer2 \
start_oap 'no-init'
fi
echo 'starting instrumented services...' && start_instrumented_services
......
......@@ -45,7 +45,6 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
......@@ -61,10 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.ALL_ENDPOINT_METRICS;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.ALL_INSTANCE_METRICS;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.ALL_SERVICE_METRICS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.*;
/**
* @author kezhenxu94
......@@ -118,18 +114,29 @@ public class ClusterVerificationITCase {
}
private void verifyTopo(LocalDateTime minutesAgo) throws Exception {
final TopoData topoData = queryClient.topo(
new TopoQuery()
.stepByMinute()
.start(minutesAgo)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1))
);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ClusterVerificationITCase.topo.yml").getInputStream();
final TopoMatcher topoMatcher = yaml.loadAs(expectedInputStream, TopoMatcher.class);
topoMatcher.verify(topoData);
boolean valid = false;
while (!valid) {
try {
final TopoData topoData = queryClient.topo(
new TopoQuery()
.stepByMinute()
.start(minutesAgo)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1))
);
LOGGER.info("Actual topology: {}", topoData);
InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ClusterVerificationITCase.topo.yml").getInputStream();
final TopoMatcher topoMatcher = yaml.loadAs(expectedInputStream, TopoMatcher.class);
topoMatcher.verify(topoData);
valid = true;
} catch (Throwable t) {
LOGGER.warn(t.getMessage(), t);
generateTraffic();
Thread.sleep(retryInterval);
}
}
}
private void verifyServices(LocalDateTime minutesAgo) throws Exception {
......@@ -333,14 +340,17 @@ public class ClusterVerificationITCase {
}
private void generateTraffic() {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
LOGGER.info("responseEntity: {}, {}", responseEntity.getStatusCode(), responseEntity.getBody());
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
try {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
LOGGER.info("responseEntity: {}, {}", responseEntity.getStatusCode(), responseEntity.getBody());
} catch (Throwable t) {
LOGGER.warn(t.getMessage(), t);
}
}
}
......@@ -27,6 +27,9 @@ nodes:
- id: not null
name: "localhost:-1"
type: H2
- id: not null
name: "127.0.0.1:9099"
type: Unknown
calls:
- id: not null
source: not null
......@@ -37,3 +40,6 @@ calls:
- id: not null
source: not null
target: not null
- id: not null
source: not null
target: not null
......@@ -46,6 +46,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.boot.version>2.1.5.RELEASE</spring.boot.version>
<spring.cloud.version>2.1.2.RELEASE</spring.cloud.version>
<faster.jackson.version>2.9.7</faster.jackson.version>
<junit.version>4.12</junit.version>
<jackson.version>2.9.7</jackson.version>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册