未验证 提交 f9257f52 编写于 作者: D Daming 提交者: GitHub

Provide kafka as collector/reporter (#4847)

上级 5f769ca2
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: E2E
on:
pull_request:
push:
branches:
- master
tags:
- 'v*'
env:
SKIP_TEST: true
SW_AGENT_JDK_VERSION: 8
CODECOV_TOKEN: d2065307-8f01-4637-9715-2781ef096db7
jobs:
Kafka:
name: Kafka
runs-on: ubuntu-latest
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Compile and Build
run: make docker
- name: Copy dist package
run: cp -R dist test/e2e/
- name: Kafka
run: ./mvnw --batch-mode -f test/e2e/pom.xml -am -DfailIfNoTests=false verify -Dit.test=org.apache.skywalking.e2e.kafka.KafkaE2E
- name: Report Coverage
run: bash -x tools/coverage/report.sh
- uses: actions/upload-artifact@v1
if: failure()
with:
name: logs
path: logs
profile:
name: Kafka(profiling)
runs-on: ubuntu-latest
timeout-minutes: 90
env:
SW_STORAGE: influxdb
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Compile and Build
run: make docker
- name: Copy dist package
run: cp -R dist test/e2e/
- name: Kafka Profiling
run: ./mvnw --batch-mode -f test/e2e/pom.xml -am -DfailIfNoTests=false verify -Dit.test=org.apache.skywalking.e2e.kafka.KafkaProfileE2E
- name: Report Coverage
run: bash -x tools/coverage/report.sh
- uses: actions/upload-artifact@v1
if: failure()
with:
name: logs
path: logs
meter:
name: Kafka(meter)
runs-on: ubuntu-latest
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Compile and Build
run: make docker
- name: Copy dist package
run: cp -R dist test/e2e/
- name: Kafka Meter System
run: ./mvnw --batch-mode -f test/e2e/pom.xml -am -DfailIfNoTests=false verify -Dit.test=org.apache.skywalking.e2e.kafka.KafkaMeterE2E
- name: Report Coverage
run: bash -x tools/coverage/report.sh
- uses: actions/upload-artifact@v1
if: failure()
with:
name: logs
path: logs
\ No newline at end of file
Subproject commit e3ec161a5edbc06f05f847a0f6d7033cb9d95632
Subproject commit 4be71766f60b943915d3ac8f91c996a5ff680ca0
......@@ -152,6 +152,7 @@ public class Config {
* Get profile task list interval
*/
public static int GET_PROFILE_TASK_INTERVAL = 20;
}
public static class Profile {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.jvm;
import io.grpc.Channel;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetric;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetricCollection;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetricReportServiceGrpc;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
@DefaultImplementor
public class JVMMetricsSender implements BootService, Runnable, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(JVMMetricsSender.class);
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile JVMMetricReportServiceGrpc.JVMMetricReportServiceBlockingStub stub = null;
private LinkedBlockingQueue<JVMMetric> queue;
@Override
public void prepare() {
queue = new LinkedBlockingQueue<>(Config.Jvm.BUFFER_SIZE);
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
@Override
public void boot() {
}
public void offer(JVMMetric metric) {
// drop last message and re-deliver
if (!queue.offer(metric)) {
queue.poll();
queue.offer(metric);
}
}
@Override
public void run() {
if (status == GRPCChannelStatus.CONNECTED) {
try {
JVMMetricCollection.Builder builder = JVMMetricCollection.newBuilder();
LinkedList<JVMMetric> buffer = new LinkedList<>();
queue.drainTo(buffer);
if (buffer.size() > 0) {
builder.addAllMetrics(buffer);
builder.setService(Config.Agent.SERVICE_NAME);
builder.setServiceInstance(Config.Agent.INSTANCE_NAME);
Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.collect(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwable t) {
logger.error(t, "send JVM metrics to Collector fail.");
}
}
}
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
stub = JVMMetricReportServiceGrpc.newBlockingStub(channel);
}
this.status = status;
}
@Override
public void onComplete() {
}
@Override
public void shutdown() {
}
}
......@@ -18,18 +18,13 @@
package org.apache.skywalking.apm.agent.core.jvm;
import io.grpc.Channel;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.jvm.cpu.CPUProvider;
import org.apache.skywalking.apm.agent.core.jvm.gc.GCProvider;
import org.apache.skywalking.apm.agent.core.jvm.memory.MemoryProvider;
......@@ -37,17 +32,10 @@ import org.apache.skywalking.apm.agent.core.jvm.memorypool.MemoryPoolProvider;
import org.apache.skywalking.apm.agent.core.jvm.thread.ThreadProvider;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetric;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetricCollection;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetricReportServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
/**
* The <code>JVMService</code> represents a timer, which collectors JVM cpu, memory, memorypool and gc info, and send
* the collected info to Collector through the channel provided by {@link GRPCChannelManager}
......@@ -55,16 +43,13 @@ import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UP
@DefaultImplementor
public class JVMService implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(JVMService.class);
private LinkedBlockingQueue<JVMMetric> queue;
private volatile ScheduledFuture<?> collectMetricFuture;
private volatile ScheduledFuture<?> sendMetricFuture;
private Sender sender;
private JVMMetricsSender sender;
@Override
public void prepare() throws Throwable {
queue = new LinkedBlockingQueue<JVMMetric>(Config.Jvm.BUFFER_SIZE);
sender = new Sender();
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(sender);
sender = ServiceManager.INSTANCE.findService(JVMMetricsSender.class);
}
@Override
......@@ -75,12 +60,8 @@ public class JVMService implements BootService, Runnable {
this,
new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(
Throwable t) {
logger.error(
"JVMService produces metrics failure.",
t
);
public void handle(Throwable t) {
logger.error("JVMService produces metrics failure.", t);
}
}
), 0, 1, TimeUnit.SECONDS);
......@@ -90,12 +71,8 @@ public class JVMService implements BootService, Runnable {
sender,
new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(
Throwable t) {
logger.error(
"JVMService consumes and upload failure.",
t
);
public void handle(Throwable t) {
logger.error("JVMService consumes and upload failure.", t);
}
}
), 0, 1, TimeUnit.SECONDS);
......@@ -124,50 +101,9 @@ public class JVMService implements BootService, Runnable {
jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());
jvmBuilder.setThread(ThreadProvider.INSTANCE.getThreadMetrics());
JVMMetric jvmMetric = jvmBuilder.build();
if (!queue.offer(jvmMetric)) {
queue.poll();
queue.offer(jvmMetric);
}
sender.offer(jvmBuilder.build());
} catch (Exception e) {
logger.error(e, "Collect JVM info fail.");
}
}
private class Sender implements Runnable, GRPCChannelListener {
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile JVMMetricReportServiceGrpc.JVMMetricReportServiceBlockingStub stub = null;
@Override
public void run() {
if (status == GRPCChannelStatus.CONNECTED) {
try {
JVMMetricCollection.Builder builder = JVMMetricCollection.newBuilder();
LinkedList<JVMMetric> buffer = new LinkedList<JVMMetric>();
queue.drainTo(buffer);
if (buffer.size() > 0) {
builder.addAllMetrics(buffer);
builder.setService(Config.Agent.SERVICE_NAME);
builder.setServiceInstance(Config.Agent.INSTANCE_NAME);
Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.collect(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwable t) {
logger.error(t, "send JVM metrics to Collector fail.");
}
}
}
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
stub = JVMMetricReportServiceGrpc.newBlockingStub(channel);
}
this.status = status;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.meter;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.meter.transform.MeterTransformer;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
@DefaultImplementor
public class MeterSender implements BootService, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(MeterSender.class);
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile MeterReportServiceGrpc.MeterReportServiceStub meterReportServiceStub;
@Override
public void prepare() {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
@Override
public void boot() {
}
public void send(Map<MeterId, MeterTransformer> meterMap, MeterService meterService) {
if (status == GRPCChannelStatus.CONNECTED) {
StreamObserver<MeterData> reportStreamObserver = null;
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
try {
reportStreamObserver = meterReportServiceStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
}
@Override
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send meters to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
final StreamObserver<MeterData> reporter = reportStreamObserver;
transform(meterMap, meterData -> reporter.onNext(meterData));
} catch (Throwable e) {
if (!(e instanceof StatusRuntimeException)) {
logger.error(e, "Report meters to backend fail.");
return;
}
final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
if (statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED) {
logger.warn("Backend doesn't support meter, it will be disabled");
meterService.shutdown();
}
} finally {
if (reportStreamObserver != null) {
reportStreamObserver.onCompleted();
}
status.wait4Finish();
}
}
}
protected void transform(final Map<MeterId, MeterTransformer> meterMap,
final Consumer<MeterData> consumer) {
// build and report meters
boolean hasSendMachineInfo = false;
for (MeterTransformer meterTransformer : meterMap.values()) {
final MeterData.Builder dataBuilder = meterTransformer.transform();
if (dataBuilder == null) {
continue;
}
// only send the service base info at the first data
if (!hasSendMachineInfo) {
dataBuilder.setService(Config.Agent.SERVICE_NAME);
dataBuilder.setServiceInstance(Config.Agent.INSTANCE_NAME);
dataBuilder.setTimestamp(System.currentTimeMillis());
hasSendMachineInfo = true;
}
consumer.accept(dataBuilder.build());
}
}
@Override
public void onComplete() {
}
@Override
public void shutdown() {
}
@Override
public void statusChanged(final GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
meterReportServiceStub = MeterReportServiceGrpc.newStub(channel);
} else {
meterReportServiceStub = null;
}
this.status = status;
}
}
......@@ -18,10 +18,10 @@
package org.apache.skywalking.apm.agent.core.meter;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
......@@ -30,38 +30,20 @@ import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.meter.transform.MeterTransformer;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
@DefaultImplementor
public class MeterService implements BootService, Runnable, GRPCChannelListener {
public class MeterService implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(MeterService.class);
// all meters
private final ConcurrentHashMap<MeterId, MeterTransformer> meterMap = new ConcurrentHashMap<>();
// channel status
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
// gRPC stub
private volatile MeterReportServiceGrpc.MeterReportServiceStub meterReportServiceStub;
// report meters
private volatile ScheduledFuture<?> reportMeterFuture;
private MeterSender sender;
/**
* Register the meterTransformer
*/
......@@ -70,7 +52,8 @@ public class MeterService implements BootService, Runnable, GRPCChannelListener
return;
}
if (meterMap.size() >= Config.Meter.MAX_METER_SIZE) {
logger.warn("Already out of the meter system max size, will not report. meter name:{}", meterTransformer.getName());
logger.warn(
"Already out of the meter system max size, will not report. meter name:{}", meterTransformer.getName());
return;
}
......@@ -78,12 +61,12 @@ public class MeterService implements BootService, Runnable, GRPCChannelListener
}
@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
public void prepare() {
sender = ServiceManager.INSTANCE.findService(MeterSender.class);
}
@Override
public void boot() throws Throwable {
public void boot() {
if (Config.Meter.ACTIVE) {
reportMeterFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("MeterReportService")
......@@ -95,11 +78,11 @@ public class MeterService implements BootService, Runnable, GRPCChannelListener
}
@Override
public void onComplete() throws Throwable {
public void onComplete() {
}
@Override
public void shutdown() throws Throwable {
public void shutdown() {
if (reportMeterFuture != null) {
reportMeterFuture.cancel(true);
}
......@@ -109,82 +92,10 @@ public class MeterService implements BootService, Runnable, GRPCChannelListener
@Override
public void run() {
if (status != GRPCChannelStatus.CONNECTED || meterMap.isEmpty()) {
if (meterMap.isEmpty()) {
return;
}
StreamObserver<MeterData> reportStreamObserver = null;
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
try {
reportStreamObserver = meterReportServiceStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
}
@Override
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send meters to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
// build and report meters
boolean hasSendMachineInfo = false;
for (MeterTransformer meterTransformer : meterMap.values()) {
final MeterData.Builder dataBuilder = meterTransformer.transform();
if (dataBuilder == null) {
continue;
}
// only send the service base info at the first data
if (!hasSendMachineInfo) {
dataBuilder.setService(Config.Agent.SERVICE_NAME);
dataBuilder.setServiceInstance(Config.Agent.INSTANCE_NAME);
dataBuilder.setTimestamp(System.currentTimeMillis());
hasSendMachineInfo = true;
}
reportStreamObserver.onNext(dataBuilder.build());
}
} catch (Throwable e) {
if (!(e instanceof StatusRuntimeException)) {
logger.error(e, "Report meters to backend fail.");
return;
}
final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
if (statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED) {
logger.warn("Backend doesn't support meter, it will be disabled");
if (reportMeterFuture != null) {
reportMeterFuture.cancel(true);
}
}
} finally {
if (reportStreamObserver != null) {
reportStreamObserver.onCompleted();
}
status.wait4Finish();
}
}
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
meterReportServiceStub = MeterReportServiceGrpc.newStub(channel);
} else {
meterReportServiceStub = null;
}
this.status = status;
sender.send(meterMap, this);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.profile;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.profile.v3.ProfileTaskGrpc;
import org.apache.skywalking.apm.network.language.profile.v3.ThreadSnapshot;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
/**
* send segment snapshot
*/
@DefaultImplementor
public class ProfileSnapshotSender implements BootService, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(ProfileSnapshotSender.class);
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ProfileTaskGrpc.ProfileTaskStub profileTaskStub;
@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
@Override
public void boot() throws Throwable {
}
@Override
public void statusChanged(final GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
profileTaskStub = ProfileTaskGrpc.newStub(channel);
} else {
profileTaskStub = null;
}
this.status = status;
}
public void send(List<TracingThreadSnapshot> buffer) {
if (status == GRPCChannelStatus.CONNECTED) {
try {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<ThreadSnapshot> snapshotStreamObserver = profileTaskStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collectSnapshot(
new StreamObserver<Commands>() {
@Override
public void onNext(
Commands commands) {
}
@Override
public void onError(
Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(
throwable,
"Send profile segment snapshot to collector fail with a grpc internal exception."
);
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
}
);
for (TracingThreadSnapshot snapshot : buffer) {
final ThreadSnapshot transformSnapshot = snapshot.transform();
snapshotStreamObserver.onNext(transformSnapshot);
}
snapshotStreamObserver.onCompleted();
status.wait4Finish();
} catch (Throwable t) {
logger.error(t, "Send profile segment snapshot to backend fail.");
}
}
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
}
}
\ No newline at end of file
......@@ -21,8 +21,8 @@ package org.apache.skywalking.apm.agent.core.profile;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -39,12 +39,10 @@ import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.profile.v3.ProfileTaskCommandQuery;
import org.apache.skywalking.apm.network.language.profile.v3.ProfileTaskFinishReport;
import org.apache.skywalking.apm.network.language.profile.v3.ProfileTaskGrpc;
import org.apache.skywalking.apm.network.language.profile.v3.ThreadSnapshot;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
......@@ -64,7 +62,7 @@ public class ProfileTaskChannelService implements BootService, Runnable, GRPCCha
// gRPC stub
private volatile ProfileTaskGrpc.ProfileTaskBlockingStub profileTaskBlockingStub;
private volatile ProfileTaskGrpc.ProfileTaskStub profileTaskStub;
// segment snapshot sender
private final BlockingQueue<TracingThreadSnapshot> snapshotQueue = new LinkedBlockingQueue<>(
......@@ -74,6 +72,8 @@ public class ProfileTaskChannelService implements BootService, Runnable, GRPCCha
// query task list schedule
private volatile ScheduledFuture<?> getTaskListFuture;
private ProfileSnapshotSender sender;
@Override
public void run() {
if (status == GRPCChannelStatus.CONNECTED) {
......@@ -81,8 +81,7 @@ public class ProfileTaskChannelService implements BootService, Runnable, GRPCCha
ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();
// sniffer info
builder.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME);
builder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME);
// last command create time
builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class)
......@@ -120,6 +119,8 @@ public class ProfileTaskChannelService implements BootService, Runnable, GRPCCha
@Override
public void boot() {
sender = ServiceManager.INSTANCE.findService(ProfileSnapshotSender.class);
if (Config.Profile.ACTIVE) {
// query task list
getTaskListFuture = Executors.newSingleThreadScheduledExecutor(
......@@ -135,7 +136,13 @@ public class ProfileTaskChannelService implements BootService, Runnable, GRPCCha
new DefaultNamedThreadFactory("ProfileSendSnapshotService")
).scheduleWithFixedDelay(
new RunnableWithExceptionProtection(
new SnapshotSender(),
() -> {
List<TracingThreadSnapshot> buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
snapshotQueue.drainTo(buffer);
if (!buffer.isEmpty()) {
sender.send(buffer);
}
},
t -> logger.error("Profile segment snapshot upload failure.", t)
), 0, 500, TimeUnit.MILLISECONDS
);
......@@ -162,10 +169,8 @@ public class ProfileTaskChannelService implements BootService, Runnable, GRPCCha
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
profileTaskStub = ProfileTaskGrpc.newStub(channel);
} else {
profileTaskBlockingStub = null;
profileTaskStub = null;
}
this.status = status;
}
......@@ -197,62 +202,4 @@ public class ProfileTaskChannelService implements BootService, Runnable, GRPCCha
}
}
/**
* send segment snapshot
*/
private class SnapshotSender implements Runnable {
@Override
public void run() {
if (status == GRPCChannelStatus.CONNECTED) {
try {
ArrayList<TracingThreadSnapshot> buffer = new ArrayList<>(
Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
snapshotQueue.drainTo(buffer);
if (buffer.size() > 0) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<ThreadSnapshot> snapshotStreamObserver = profileTaskStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collectSnapshot(
new StreamObserver<Commands>() {
@Override
public void onNext(
Commands commands) {
}
@Override
public void onError(
Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(
throwable,
"Send profile segment snapshot to collector fail with a grpc internal exception."
);
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class)
.reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
}
);
for (TracingThreadSnapshot snapshot : buffer) {
final ThreadSnapshot transformSnapshot = snapshot.transform();
snapshotStreamObserver.onNext(transformSnapshot);
}
snapshotStreamObserver.onCompleted();
status.wait4Finish();
}
} catch (Throwable t) {
logger.error(t, "Send profile segment snapshot to backend fail.");
}
}
}
}
}
......@@ -20,11 +20,14 @@ org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
org.apache.skywalking.apm.agent.core.context.ContextManager
org.apache.skywalking.apm.agent.core.sampling.SamplingService
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMMetricsSender
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.ServiceManagementClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
org.apache.skywalking.apm.agent.core.meter.MeterService
\ No newline at end of file
org.apache.skywalking.apm.agent.core.meter.MeterService
org.apache.skywalking.apm.agent.core.meter.MeterSender
\ No newline at end of file
......@@ -58,7 +58,7 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(12));
assertThat(registryService.size(), is(15));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
......@@ -107,7 +107,7 @@ public class ServiceManagerTest {
assertNotNull(service);
List<GRPCChannelListener> listeners = getFieldValue(service, "listeners");
assertEquals(listeners.size(), 6);
assertEquals(listeners.size(), 7);
}
private void assertSamplingService(SamplingService service) {
......
......@@ -67,6 +67,8 @@ public class MeterServiceTest {
private MeterService registryService = new MeterService();
private List<MeterData> upstreamMeters;
private MeterSender sender = new MeterSender();
private MeterReportServiceGrpc.MeterReportServiceImplBase serviceImplBase = new MeterReportServiceGrpc.MeterReportServiceImplBase() {
@Override
public StreamObserver<MeterData> collect(final StreamObserver<Commands> responseObserver) {
......@@ -104,11 +106,14 @@ public class MeterServiceTest {
@Before
public void setUp() throws Throwable {
spy(sender);
spy(registryService);
Whitebox.setInternalState(
registryService, "meterReportServiceStub", MeterReportServiceGrpc.newStub(grpcServerRule.getChannel()));
Whitebox.setInternalState(registryService, "status", GRPCChannelStatus.CONNECTED);
sender, "meterReportServiceStub", MeterReportServiceGrpc.newStub(grpcServerRule.getChannel()));
Whitebox.setInternalState(sender, "status", GRPCChannelStatus.CONNECTED);
Whitebox.setInternalState(registryService, "sender", sender);
upstreamMeters = new ArrayList<>();
}
......
......@@ -87,3 +87,6 @@ logging.level=${SW_LOGGING_LEVEL:INFO}
# mysql plugin configuration
# plugin.mysql.trace_sql_parameters=${SW_MYSQL_TRACE_SQL_PARAMETERS:false}
# Kafka producer configuration
plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>optional-reporter-plugins</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-reporter-plugin</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafk-clients.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<relocations>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>${shade.package}/org.apache.kafka</shadedPattern>
</relocation>
<relocation>
<pattern>common.message</pattern>
<shadedPattern>${shade.package}/common.message</shadedPattern>
</relocation>
<relocation>
<pattern>kafka</pattern>
<shadedPattern>${shade.package}/kafka</shadedPattern>
</relocation>
<relocation>
<pattern>org.slf4j</pattern>
<shadedPattern>${shade.package}/org.slf4j</shadedPattern>
</relocation>
<relocation>
<pattern>org.xerial</pattern>
<shadedPattern>${shade.package}/org.xerial</shadedPattern>
</relocation>
<relocation>
<pattern>net.jpountz</pattern>
<shadedPattern>${shade.package}/net.jpountz</shadedPattern>
</relocation>
<relocation>
<pattern>aix.ppc64</pattern>
<shadedPattern>${shade.package}/aix.ppc64</shadedPattern>
</relocation>
<relocation>
<pattern>com.github.luben</pattern>
<shadedPattern>${shade.package}/com.github.luben</shadedPattern>
</relocation>
<relocation>
<pattern>darwin</pattern>
<shadedPattern>${shade.package}/darwin</shadedPattern>
</relocation>
<relocation>
<pattern>include</pattern>
<shadedPattern>${shade.package}/include</shadedPattern>
</relocation>
<relocation>
<pattern>linux</pattern>
<shadedPattern>${shade.package}/linux</shadedPattern>
</relocation>
<relocation>
<pattern>win</pattern>
<shadedPattern>${shade.package}/win</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<taskdef resource="net/sf/antcontrib/antcontrib.properties"
classpathref="maven.runtime.classpath"/>
<if>
<equals arg1="${project.packaging}" arg2="jar"/>
<then>
<mkdir dir="${optional.reporter.plugins.dest.dir}"/>
<copy
file="${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar"
tofile="${optional.reporter.plugins.dest.dir}/${project.artifactId}-${project.version}.jar"
overwrite="true"/>
</then>
</if>
</tasks>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>ant-contrib</groupId>
<artifactId>ant-contrib</artifactId>
<version>${ant-contrib.version}</version>
<exclusions>
<exclusion>
<groupId>ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant-nodeps</artifactId>
<version>${ant-nodeps.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
/**
* For compatible with {@link ContextManagerExtendService}, don't need to manage connection status by self.
*/
@OverrideImplementor(ContextManagerExtendService.class)
public class KafkaContextManagerExtendService extends ContextManagerExtendService {
@Override
public void prepare() {
statusChanged(GRPCChannelStatus.CONNECTED);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.jvm.JVMMetricsSender;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetric;
import org.apache.skywalking.apm.network.language.agent.v3.JVMMetricCollection;
/**
* A report to send JVM Metrics data to Kafka Broker.
*/
@OverrideImplementor(JVMMetricsSender.class)
public class KafkaJVMMetricsSender extends JVMMetricsSender {
private static final ILog logger = LogManager.getLogger(KafkaJVMMetricsSender.class);
private KafkaProducer<String, Bytes> producer;
private String topic;
private BlockingQueue<JVMMetric> queue;
private volatile boolean running = false;
@Override
public void run() {
if (!queue.isEmpty()) {
List<JVMMetric> buffer = new ArrayList<>();
queue.drainTo(buffer);
if (running) {
JVMMetricCollection metrics = JVMMetricCollection.newBuilder()
.addAllMetrics(buffer)
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build();
if (logger.isDebugEnable()) {
logger.debug(
"JVM metrics reporting, topic: {}, key: {}, length: {}", topic, metrics.getServiceInstance(),
buffer.size()
);
}
producer.send(new ProducerRecord<>(
topic,
metrics.getServiceInstance(),
Bytes.wrap(metrics.toByteArray())
));
producer.flush();
}
}
}
@Override
public void prepare() {
queue = new LinkedBlockingQueue<>(Config.Jvm.BUFFER_SIZE);
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS;
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
running = true;
}
@Override
public void offer(final JVMMetric metric) {
if (!queue.offer(metric)) {
queue.poll();
queue.offer(metric);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.meter.MeterId;
import org.apache.skywalking.apm.agent.core.meter.MeterSender;
import org.apache.skywalking.apm.agent.core.meter.MeterService;
import org.apache.skywalking.apm.agent.core.meter.transform.MeterTransformer;
import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
/**
* A report to send JVM Metrics data to Kafka Broker.
*/
@OverrideImplementor(MeterSender.class)
public class KafkaMeterSender extends MeterSender {
private static final ILog logger = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
private String topic;
private KafkaProducer<String, Bytes> producer;
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METER;
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
public void send(Map<MeterId, MeterTransformer> meterMap, MeterService meterService) {
MeterDataCollection.Builder builder = MeterDataCollection.newBuilder();
transform(meterMap, meterData -> {
if (logger.isDebugEnable()) {
logger.debug("Meter data reporting, instance: {}", meterData.getServiceInstance());
}
builder.addMeterData(meterData);
});
producer.send(
new ProducerRecord<>(topic, Config.Agent.INSTANCE_NAME, Bytes.wrap(builder.build().toByteArray())));
producer.flush();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
/**
* Configuring, initializing and holding a KafkaProducer instance for reporters.
*/
@DefaultImplementor
public class KafkaProducerManager implements BootService, Runnable {
private KafkaProducer<String, Bytes> producer;
@Override
public void prepare() throws Throwable {
Properties properties = new Properties();
properties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach((k, v) -> properties.setProperty(k, v));
AdminClient adminClient = AdminClient.create(properties);
DescribeTopicsResult topicsResult = adminClient.describeTopics(Arrays.asList(
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_MANAGEMENT,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_PROFILING,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT,
KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METER
));
Set<String> topics = topicsResult.values().entrySet().stream()
.map(entry -> {
try {
entry.getValue().get();
return null;
} catch (InterruptedException | ExecutionException e) {
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!topics.isEmpty()) {
throw new Exception("These topics" + topics + " don't exist.");
}
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
}
@Override
public void boot() {
}
@Override
public void onComplete() {
}
@Override
public void run() {
}
/**
* Get the KafkaProducer instance to send data to Kafka broker.
*/
public final KafkaProducer<String, Bytes> getProducer() {
return producer;
}
@Override
public void shutdown() {
producer.flush();
producer.close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.List;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender;
import org.apache.skywalking.apm.agent.core.profile.TracingThreadSnapshot;
import org.apache.skywalking.apm.network.language.profile.v3.ThreadSnapshot;
/**
* To transport profiling tasks between OAP Server and agent with gRPC. This is why we still have to configure gRPC. But
* to report the tracing profile snapshot data by Kafka Producer.
*/
@OverrideImplementor(ProfileSnapshotSender.class)
public class KafkaProfileSnapshotSender extends ProfileSnapshotSender {
private static final ILog logger = LogManager.getLogger(ProfileSnapshotSender.class);
private String topic;
private KafkaProducer<String, Bytes> producer;
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_PROFILING;
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
@Override
public void send(final List<TracingThreadSnapshot> buffer) {
for (TracingThreadSnapshot snapshot : buffer) {
final ThreadSnapshot object = snapshot.transform();
if (logger.isDebugEnable()) {
logger.debug("Thread snapshot reporting, topic: {}, taskId: {}, sequence:{}, traceId: {}",
object.getTaskId(), object.getSequence(), object.getTraceSegmentId()
);
}
producer.send(new ProducerRecord<>(
topic,
object.getTaskId() + object.getSequence(),
Bytes.wrap(object.toByteArray())
));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.agent.core.boot.PluginConfig;
public class KafkaReporterPluginConfig {
public static class Plugin {
@PluginConfig(root = KafkaReporterPluginConfig.class)
public static class Kafka {
/**
* <B>bootstrap.servers</B>: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
* This list should be in the form host1:port1,host2:port2,...
*/
public static String BOOTSTRAP_SERVERS = "localhost:9092";
public static String TOPIC_METRICS = "skywalking-metrics";
public static String TOPIC_PROFILING = "skywalking-profilings";
public static String TOPIC_SEGMENT = "skywalking-segments";
public static String TOPIC_MANAGEMENT = "skywalking-managements";
public static String TOPIC_METER = "skywalking-meters";
public static Map<String, String> PRODUCER_CONFIG = new HashMap<>();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
import org.apache.skywalking.apm.agent.core.remote.ServiceManagementClient;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.management.v3.InstancePingPkg;
import org.apache.skywalking.apm.network.management.v3.InstanceProperties;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
/**
* A service management data(Instance registering properties and Instance pinging) reporter.
*/
@OverrideImplementor(ServiceManagementClient.class)
public class KafkaServiceManagementServiceClient implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(KafkaServiceManagementServiceClient.class);
private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES;
private static final String TOPIC_KEY_REGISTER = "register-";
private ScheduledFuture<?> heartbeatFuture;
private KafkaProducer<String, Bytes> producer;
private String topic;
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_MANAGEMENT;
SERVICE_INSTANCE_PROPERTIES = new ArrayList<>();
for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
.setKey(key)
.setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
.build());
}
Config.Agent.INSTANCE_NAME = StringUtil.isEmpty(Config.Agent.INSTANCE_NAME)
? UUID.randomUUID().toString().replaceAll("-", "") + "@" + OSUtil.getIPV4()
: Config.Agent.INSTANCE_NAME;
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ServiceManagementClientKafkaProducer")
).scheduleAtFixedRate(new RunnableWithExceptionProtection(
this,
t -> logger.error("unexpected exception.", t)
), 0, Config.Collector.HEARTBEAT_PERIOD, TimeUnit.SECONDS);
InstanceProperties instance = InstanceProperties.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.addAllProperties(OSUtil.buildOSInfo(
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.build();
producer.send(new ProducerRecord<>(topic, TOPIC_KEY_REGISTER + instance.getServiceInstance(), Bytes.wrap(instance.toByteArray())));
producer.flush();
}
@Override
public void run() {
InstancePingPkg ping = InstancePingPkg.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build();
if (logger.isDebugEnable()) {
logger.debug("Heartbeat reporting, instance: {}", ping.getServiceInstance());
}
producer.send(new ProducerRecord<>(ping.getServiceInstance(), Bytes.wrap(ping.toByteArray())));
}
@Override
public void onComplete() {
}
@Override
public void shutdown() {
heartbeatFuture.cancel(true);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
/**
* A tracing segment data reporter.
*/
@OverrideImplementor(TraceSegmentServiceClient.class)
public class KafkaTraceSegmentServiceClient implements BootService, TracingContextListener {
private static final ILog logger = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
private String topic;
private KafkaProducer<String, Bytes> producer;
@Override
public void prepare() {
topic = KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_SEGMENT;
}
@Override
public void boot() {
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
}
@Override
public void onComplete() {
TracingContext.ListenerManager.add(this);
}
@Override
public void shutdown() {
TracingContext.ListenerManager.remove(this);
}
@Override
public void afterFinished(final TraceSegment traceSegment) {
if (logger.isDebugEnable()) {
logger.debug("Trace segment reporting, traceId: {}", traceSegment.getTraceSegmentId());
}
if (traceSegment.isIgnore()) {
logger.debug("Trace[TraceId={}] is ignored.", traceSegment.getTraceSegmentId());
return;
}
SegmentObject upstreamSegment = traceSegment.transform();
ProducerRecord<String, Bytes> record = new ProducerRecord<>(
topic,
upstreamSegment.getTraceSegmentId(),
Bytes.wrap(upstreamSegment.toByteArray())
);
producer.send(record);
}
}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManager
org.apache.skywalking.apm.agent.core.kafka.KafkaContextManagerExtendService
org.apache.skywalking.apm.agent.core.kafka.KafkaServiceManagementServiceClient
org.apache.skywalking.apm.agent.core.kafka.KafkaJVMMetricsSender
org.apache.skywalking.apm.agent.core.kafka.KafkaTraceSegmentServiceClient
org.apache.skywalking.apm.agent.core.kafka.KafkaProfileSnapshotSender
org.apache.skywalking.apm.agent.core.kafka.KafkaMeterSender
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sniffer</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>optional-reporter-plugins</artifactId>
<packaging>pom</packaging>
<modules>
<module>kafka-reporter-plugin</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<agent.package.dest.dir>${project.build.directory}/../../../../skywalking-agent</agent.package.dest.dir>
<optional.reporter.plugins.dest.dir>${agent.package.dest.dir}/optional-reporter-plugins</optional.reporter.plugins.dest.dir>
<ant-contrib.version>1.0b3</ant-contrib.version>
<ant-nodeps.version>1.8.1</ant-nodeps.version>
<kafk-clients.version>2.4.1</kafk-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-util</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-test-tools</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -36,6 +36,7 @@
<module>apm-test-tools</module>
<module>bootstrap-plugins</module>
<module>optional-plugins</module>
<module>optional-reporter-plugins</module>
</modules>
<properties>
......
......@@ -336,6 +336,10 @@ The text of each license is the standard Apache 2.0 license.
jose4j 0.7.0: https://mvnrepository.com/artifact/org.bitbucket.b_c/jose4j, Apache 2.0
converter-moshi 2.5.0: https://mvnrepository.com/artifact/com.squareup.retrofit2/converter-moshi, Apache 2.0
vavr 0.10.3: https://github.com/vavr-io/vavr, Apache 2.0
kafka-clients 2.4.1: https://github.com/apache/kafka, Apache 2.0
lz4-java 1.6.0: https://github.com/jpountz/lz4-java, Apache 2.0
snappy-java 1.1.7.3: https://github.com/xerial/snappy-java, Apache 2.0
slf4j-api 1.7.28: http://www.slf4j.org, Apache 2.0
========================================================================
MIT licenses
......@@ -380,6 +384,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
proto files from gogo: https://github.com/gogo/protobuf BSD-2
proto files from grpc-gateway, https://github.com/grpc-ecosystem/grpc-gateway/tree/master/protoc-gen-swagger/options BSD-3
zstd-jni 1.4.3-1: https://github.com/luben/zstd-jni, BSD-3-Clause
========================================================================
MPL 2.0 licenses
......
......@@ -879,3 +879,20 @@ This product includes the software developed by third-party:
* sbt-extras: https://github.com/paulp/sbt-extras (BSD) (LICENSE.sbt-extras.txt)
========================================================================
------
===========================================================================
Apache Kafka Notice
===========================================================================
Apache Kafka
Copyright 2020 The Apache Software Foundation.
This product includes software developed at
The Apache Software Foundation (https://www.apache.org/).
This distribution has a binary dependency on jersey, which is available under the CDDL
License. The source code of jersey can be found at https://github.com/jersey/jersey/.
========================================================================
\ No newline at end of file
Zstd-jni: JNI bindings to Zstd Library
Copyright (c) 2015-present, Luben Karavelov/ All rights reserved.
BSD License
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
......@@ -160,8 +160,8 @@ The value should be an integer. The response code of OAL entities is according t
#### Tag key `db.statement` and `db.type`.
The value of `db.statement` should be a String, representing the Database statement, such as SQL, or `[No statement]/`+span#operationName if value is empty.
When exit span has this tag, OAP samples the slow statements based on `receiver-trace/default/maxSlowSQLLength`.
The threshold of slow statement is defined by following [`receiver-trace/default/slowDBAccessThreshold`](../setup/backend/slow-db-statement.md)
When exit span has this tag, OAP samples the slow statements based on `agent-analyzer/default/maxSlowSQLLength`.
The threshold of slow statement is defined by following [`agent-analyzer/default/slowDBAccessThreshold`](../setup/backend/slow-db-statement.md)
#### Extension logic endpoint. Tag key `x-le`
Logic endpoint is a concept, which doesn't represent a real RPC call, but requires the statistic.
......
......@@ -25,7 +25,7 @@ Generic placeholders are defined as follows:
* `<duration>`: a duration This will parse a textual representation of a duration. The formats accepted are based on
the ISO-8601 duration format `PnDTnHnMn.nS` with days considered to be exactly 24 hours.
* `<labelname>`: a string matching the regular expression [a-zA-Z_][a-zA-Z0-9_]*
* `<labelname>`: a string matching the regular expression \[a-zA-Z_\]\[a-zA-Z0-9_\]*
* `<labelvalue>`: a string of unicode characters
* `<host>`: a valid string consisting of a hostname or IP followed by an optional port number
* `<path>`: a valid URL path
......@@ -85,3 +85,51 @@ that adds the procedure from raw data to minute rate.
When you specify `avgHistogram` and `avgHistogramPercentile`, the source should be the type of `histogram`. A counterFunction
is also needed due to the `bucket`, `sum` and `count` of histogram are counters.
## Kafka Fetcher
Kafka Fetcher pulls messages from Kafka Broker(s) what is the Agent delivered. Check the agent documentation about the details. Typically Tracing Segments, Service/Instance properties, JVM Metrics, and Meter system data are supported. Kafka Fetcher can work with gRPC/HTTP Receivers at the same time for adopting different transport protocols.
Kafka Fetcher is disabled in default, and we configure as following to enable.
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
```
`skywalking-segments`, `skywalking-metrics`, `skywalking-profile`, `skywalking-managements` and `skywalking-meters` topics are required by `kafka-fetcher`.
If they do not exist, Kafka Fetcher will create them in default. Also, you can create them by yourself before the OAP server started.
When using the OAP server automatical creation mechanism, you could modify the number of partitions and replications of the topics through the following configurations:
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
```
In cluster mode, all topics have the same number of partitions. Then we have to set `"isSharding"` to `"true"` and assign the partitions to consume for OAP server. The OAP server can use commas to separate multiple partitions.
Kafka Fetcher allows to configure all the Kafka producers listed [here](http://kafka.apache.org/24/documentation.html#consumerconfigs) in property `kafkaConsumerConfig`. Such as:
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:-}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:true}
consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:1,3,5}
kafkaConsumerConfig:
enable.auto.commit: true
...
```
......@@ -2,10 +2,22 @@
Meter receiver is accepting the metrics of [meter protocol](https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Meter.proto) format into the [Meter System](./../../concepts-and-designs/meter.md).
## Module define
```yaml
receiver-meter:
selector: ${SW_RECEIVER_METER:default}
default:
```
In Kafka Fetcher, we need to follow the configuration to enable it.
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:true}
```
## Configuration file
Meter receiver is configured via a configuration file. The configuration file defines everything related to receiving
from agents, as well as which rule files to load.
......
# Choose receiver
Receiver is a concept in SkyWalking backend. All modules, which are responsible for receiving telemetry
or tracing data from other being monitored system, are all being called **Receiver**. If you are looking for the pull mode,
......@@ -13,7 +14,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **receiver-profile**. gRPC services accept profile task status and snapshot reporter.
1. **receiver_zipkin**. See [details](#zipkin-receiver).
1. **receiver_jaeger**. See [details](#jaeger-receiver).
1. **receiver-oc**. See [details](#oc-receiver).
1. **receiver-oc**. See [details](#opencensus-receiver).
1. **receiver-meter**. See [details](backend-meter.md).
The sample settings of these receivers should be already in default `application.yml`, and also list here
......@@ -25,8 +26,6 @@ receiver-register:
receiver-trace:
selector: ${SW_RECEIVER_TRACE:default}
default:
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
selector: ${SW_RECEIVER_JVM:default}
......
......@@ -125,6 +125,9 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | actions | The number of actions to collect. | SW_STORAGE_INFLUXDB_ACTIONS | 1000 |
| - | - | duration | The time to wait at most (milliseconds). | SW_STORAGE_INFLUXDB_DURATION | 1000|
| - | - | fetchTaskLogMaxSize | The max number of fetch task log in a request. | SW_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE | 5000|
| agent-analyzer | default | Agent Analyzer. | SW_AGENT_ANALYZER | default |
| - | -| sampleRate|Sampling rate for receiving trace. The precision is 1/10000. 10000 means 100% sample in default.|SW_TRACE_SAMPLE_RATE|10000|
| - | - |slowDBAccessThreshold|The slow database access thresholds. Unit ms.|SW_SLOW_DB_THRESHOLD|default:200,mongodb:100|
| receiver-sharing-server|default| Sharing server provides new gRPC and restful servers for data collection. Ana make the servers in the core module working for internal communication only.| - | - |
| - | - | restHost| Binding IP of restful service. Services include GraphQL query and HTTP data report| - | - |
| - | - | restPort | Binding port of restful service | - | - |
......@@ -139,8 +142,6 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | maxConcurrentCallsPerConnection | The maximum number of concurrent calls permitted for each incoming connection. Defaults to no limit. | - | - |
| receiver-register|default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-trace|default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| - | - |sampleRate|Sampling rate for receiving trace. The precision is 1/10000. 10000 means 100% sample in default.|SW_TRACE_SAMPLE_RATE|10000|
| - | - |slowDBAccessThreshold|The slow database access thresholds. Unit ms.|SW_SLOW_DB_THRESHOLD|default:200,mongodb:100|
| receiver-jvm| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-clr| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
| receiver-profile| default| Read [receiver doc](backend-receivers.md) for more details | - | - |
......@@ -172,6 +173,19 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | maxMessageSize | Sets the maximum message size allowed to be received on the server. Empty means 4 MiB | - | 4M(based on Netty) |
| prometheus-fetcher | default | Read [fetcher doc](backend-fetcher.md) for more details | - | - |
| - | - | active | Activate the Prometheus fetcher. | SW_PROMETHEUS_FETCHER_ACTIVE | false |
| kafka-fetcher | default | Read [fetcher doc](backend-fetcher.md) for more details | - | - |
| - | - | bootstrapServers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | SW_KAFKA_FETCHER_SERVERS | localhost:9092 |
| - | - | groupId | A unique string that identifies the consumer group this consumer belongs to.| - | skywalking-consumer |
| - | - | consumePartitions | Which PartitionId(s) of the topics assign to the OAP server. If more than one, is separated by commas. | SW_KAFKA_FETCHER_CONSUME_PARTITIONS | - |
| - | - | isSharding | it was true when OAP Server in cluster. | SW_KAFKA_FETCHER_IS_SHARDING | false |
| - | - | createTopicIfNotExist | If true, create the Kafka topic when it does not exist. | - | true |
| - | - | partitions | The number of partitions for the topic being created. | SW_KAFKA_FETCHER_PARTITIONS | 3 |
| - | - | enableMeterSystem | To enable to fetch and handle [Meter System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false
| - | - | replicationFactor | The replication factor for each partition in the topic being created. | SW_KAFKA_FETCHER_PARTITIONS_FACTOR | 2 |
| - | - | topicNameOfMetrics | Specifying Kafka topic name for Metrics data. | - | skywalking-metrics |
| - | - | topicNameOfProfiling | Specifying Kafka topic name for Profiling data. | - | skywalking-metrics |
| - | - | topicNameOfTracingSegments | Specifying Kafka topic name for Tracing data. | - | skywalking-metrics |
| - | - | topicNameOfManagements | Specifying Kafka topic name for service instance reporting and registering. | - | skywalking-metrics |
| query | graphql | - | GraphQL query implementation | - |
| - | - | path | Root path of GraphQL query and mutation. | SW_QUERY_GRAPHQL_PATH | /graphql|
| alarm | default | - | Read [alarm doc](backend-alarm.md) for more details. | - |
......
......@@ -6,12 +6,12 @@ Right now, SkyWalking supports following dynamic configurations.
| Config Key | Value Description | Value Format Example |
|:----:|:----:|:----:|
|receiver-trace.default.slowDBAccessThreshold| Thresholds of slow Database statement, override `receiver-trace/default/slowDBAccessThreshold` of `applciation.yml`. | default:200,mongodb:50|
|receiver-trace.default.uninstrumentedGateways| The uninstrumented gateways, override `gateways.yml`. | same as [`gateways.yml`](uninstrumented-gateways.md#configuration-format) |
|agent-analyzer.default.slowDBAccessThreshold| Thresholds of slow Database statement, override `receiver-trace/default/slowDBAccessThreshold` of `applciation.yml`. | default:200,mongodb:50|
|agent-analyzer.default.uninstrumentedGateways| The uninstrumented gateways, override `gateways.yml`. | same as [`gateways.yml`](uninstrumented-gateways.md#configuration-format) |
|alarm.default.alarm-settings| The alarm settings, will override `alarm-settings.yml`. | same as [`alarm-settings.yml`](backend-alarm.md) |
|core.default.apdexThreshold| The apdex threshold settings, will override `service-apdex-threshold.yml`. | same as [`service-apdex-threshold.yml`](apdex-threshold.md) |
|core.default.endpoint-name-grouping| The endpoint name grouping setting, will override `endpoint-name-grouping.yml`. | same as [`endpoint-name-grouping.yml`](endpoint-grouping-rules.md) |
|receiver-trace.default.sampleRate| Trace sampling , override `receiver-trace/default/sampleRate` of `applciation.yml`. | 10000 |
|agent-analyzer.default.sampleRate| Trace sampling , override `receiver-trace/default/sampleRate` of `applciation.yml`. | 10000 |
This feature depends on upstream service, so it is **DISABLED** by default.
......
......@@ -8,10 +8,10 @@ segments have been collected and reported by agents, the backend would do their
to understand why we called it `as consistent as possible` and `do their best to don't break the trace`.
## Set the sample rate
In **receiver-trace** receiver, you will find `sampleRate` setting.
In **agent-analyzer** module, you will find `sampleRate` setting.
```yaml
receiver-trace:
agent-analyzer:
default:
...
sampleRate: ${SW_TRACE_SAMPLE_RATE:1000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
......
# How to enable Kafka Reporter
The Kafka reporter plugin support report traces, JVM metrics, Instance Properties, and profiled snapshots to Kafka cluster, which is disabled in default. Move the jar of the plugin from `optional-reporter-plugins` to `reporter-plugins` for activating.
Notice, currently, the agent still needs to configure GRPC receiver for delivering the task of profiling. In other words, the following configure cannot be omitted.
```properties
# Backend service addresses.
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
# Kafka producer configuration
plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
plugin.kafka.producer_config[delivery.timeout.ms]=12000
```
Kafka reporter plugin support to customize all configurations of listed in [here](http://kafka.apache.org/24/documentation.html#producerconfigs).
After you activated the Kafka reporter, you need to open [Kafka fetcher](../../backend/backend-fetcher.md#kafka-fetcher) at the OAP side.
\ No newline at end of file
......@@ -134,6 +134,8 @@ property key | Description | Default |
`plugin.influxdb.trace_influxql`|If true, trace all the influxql(query and write) in InfluxDB access, default is true.|`true`|
`correlation.element_max_number`|Max element count of the correlation context.|`3`|
`correlation.value_max_length`|Max value length of correlation context element.|`128`|
`plugin.kafka.bootstrap_servers`| A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | `localhost:9092`
`plugin.kafka.consumer_config`| Kafka producer configuration. |
## Optional Plugins
Java agent plugins are all pluggable. Optional plugins could be provided in `optional-plugins` folder under agent or 3rd party repositories.
......@@ -173,6 +175,11 @@ Now, we have the following known bootstrap plugins.
* If you want to use OpenTracing Java APIs, try [SkyWalking OpenTracing compatible tracer](Opentracing.md). More details you could find at http://opentracing.io
* If you want to specify the path of your agent.config file. Read [set config file through system properties](Specified-agent-config.md)
## Advanced Reporters
The advanced report provides an alternative way to submit the agent collected data to the backend. All of them are in the `optional-reporter-plugins` folder, move the one you needed into the `reporter-plugins` folder for the activation. **Notice, don't try to activate multiple reporters, that could cause unexpected fatal errors.**
* Use Kafka to transport the traces, JVM metrics, instance properties, and profiled snapshots to the backend. Read the [How to enable Kafka Reporter](How-to-enable-kafka-reporter.md) for more details.
## Plugin Development Guide
SkyWalking java agent supports plugin to extend [the supported list](Supported-list.md). Please follow
our [Plugin Development Guide](../../../guides/Java-Plugin-Development-Guide.md).
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>analyzer</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>agent-analyzer</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.module;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class AnalyzerModule extends ModuleDefine {
public static final String NAME = "agent-analyzer";
public AnalyzerModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[] {
ISegmentParserService.class,
IMeterProcessService.class
};
}
}
......@@ -16,15 +16,18 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
package org.apache.skywalking.oap.server.analyzer.provider;
import java.util.Collections;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.analyzer.provider.trace.DBLatencyThresholdsAndWatcher;
import org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSampleRateWatcher;
import org.apache.skywalking.oap.server.analyzer.provider.trace.UninstrumentedGatewaysConfig;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class TraceServiceModuleConfig extends ModuleConfig {
public class AnalyzerModuleConfig extends ModuleConfig {
/**
* The sample rate precision is 1/10000. 10000 means 100% sample in default.
*/
......@@ -70,4 +73,7 @@ public class TraceServiceModuleConfig extends ModuleConfig {
@Setter
@Getter
private int maxSlowSQLLength = 2000;
@Getter
private final String configPath = "meter-receive-config";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.provider;
import java.util.List;
import lombok.Getter;
import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfigs;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessService;
import org.apache.skywalking.oap.server.analyzer.provider.trace.DBLatencyThresholdsAndWatcher;
import org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSampleRateWatcher;
import org.apache.skywalking.oap.server.analyzer.provider.trace.UninstrumentedGatewaysConfig;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.ISegmentParserService;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SegmentParserListenerManager;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SegmentParserServiceImpl;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.MultiScopesAnalysisListener;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.NetworkAddressAliasMappingListener;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.SegmentAnalysisListener;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.oal.rt.CoreOALDefine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
public class AnalyzerModuleProvider extends ModuleProvider {
@Getter
private final AnalyzerModuleConfig moduleConfig;
@Getter
private DBLatencyThresholdsAndWatcher thresholds;
@Getter
private UninstrumentedGatewaysConfig uninstrumentedGatewaysConfig;
@Getter
private SegmentParserServiceImpl segmentParserService;
@Getter
private TraceSampleRateWatcher traceSampleRateWatcher;
private List<MeterConfig> meterConfigs;
@Getter
private MeterProcessService processService;
public AnalyzerModuleProvider() {
this.moduleConfig = new AnalyzerModuleConfig();
}
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return AnalyzerModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return moduleConfig;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
thresholds = new DBLatencyThresholdsAndWatcher(moduleConfig.getSlowDBAccessThreshold(), this);
uninstrumentedGatewaysConfig = new UninstrumentedGatewaysConfig(this);
traceSampleRateWatcher = new TraceSampleRateWatcher(this);
moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);
moduleConfig.setUninstrumentedGatewaysConfig(uninstrumentedGatewaysConfig);
moduleConfig.setTraceSampleRateWatcher(traceSampleRateWatcher);
segmentParserService = new SegmentParserServiceImpl(getManager(), moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, segmentParserService);
meterConfigs = MeterConfigs.loadConfig(moduleConfig.getConfigPath());
processService = new MeterProcessService(getManager());
this.registerServiceImplementation(IMeterProcessService.class, processService);
}
@Override
public void start() throws ModuleStartException {
// load official analysis
getManager().find(CoreModule.NAME)
.provider()
.getService(OALEngineLoaderService.class)
.load(CoreOALDefine.INSTANCE);
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
.provider()
.getService(
DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(thresholds);
dynamicConfigurationService.registerConfigChangeWatcher(uninstrumentedGatewaysConfig);
dynamicConfigurationService.registerConfigChangeWatcher(traceSampleRateWatcher);
segmentParserService.setListenerManager(listenerManager());
processService.start(meterConfigs);
}
@Override
public void notifyAfterCompleted() {
}
@Override
public String[] requiredModules() {
return new String[] {
TelemetryModule.NAME,
CoreModule.NAME,
ConfigurationModule.NAME
};
}
private SegmentParserListenerManager listenerManager() {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
listenerManager.add(new MultiScopesAnalysisListener.Factory(getManager()));
listenerManager.add(new NetworkAddressAliasMappingListener.Factory(getManager()));
}
listenerManager.add(new SegmentAnalysisListener.Factory(getManager(), moduleConfig));
return listenerManager;
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.jvm.provider.handler;
package org.apache.skywalking.oap.server.analyzer.provider.jvm;
import java.util.List;
import org.apache.skywalking.apm.network.common.v3.CPU;
......@@ -27,10 +27,10 @@ import org.apache.skywalking.apm.network.language.agent.v3.MemoryPool;
import org.apache.skywalking.apm.network.language.agent.v3.Thread;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.source.GCPhrase;
import org.apache.skywalking.oap.server.core.source.MemoryPoolType;
import org.apache.skywalking.oap.server.core.analysis.NodeType;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMCPU;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMGC;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMMemory;
......@@ -49,7 +49,7 @@ public class JVMSourceDispatcher {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
}
void sendMetric(String service, String serviceInstance, JVMMetric metrics) {
public void sendMetric(String service, String serviceInstance, JVMMetric metrics) {
long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
final String serviceId = IDManager.ServiceID.buildId(service, NodeType.Normal);
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.config;
package org.apache.skywalking.oap.server.analyzer.provider.meter.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.config;
package org.apache.skywalking.oap.server.analyzer.provider.meter.config;
import lombok.Data;
import org.apache.skywalking.oap.server.core.analysis.meter.function.MeterFunction;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.config;
package org.apache.skywalking.oap.server.analyzer.provider.meter.config;
import lombok.Data;
import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import io.vavr.Tuple;
import io.vavr.Tuple2;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import lombok.Data;
import org.apache.skywalking.apm.network.language.agent.v3.Label;
......
......@@ -16,13 +16,17 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import java.util.List;
import org.apache.skywalking.oap.server.library.module.Service;
@Getter
public class MeterReceiverConfig extends ModuleConfig {
public interface IMeterProcessService extends Service {
void initMeters();
MeterProcessor createProcessor();
List<MeterBuilder> enabledBuilders();
private final String configPath = "meter-receive-config";
}
......@@ -16,18 +16,18 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import groovy.lang.GroovyShell;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.Scope;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.BucketedValues;
import org.apache.skywalking.oap.server.receiver.meter.provider.config.MeterConfig;
import org.apache.skywalking.oap.server.receiver.meter.provider.config.Scope;
import java.util.Map;
import java.util.StringJoiner;
......
......@@ -16,26 +16,34 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.meter.provider.config.MeterConfig;
import java.util.List;
import java.util.stream.Collectors;
/**
* Management all of the meter builders.
*/
public class MeterProcessContext {
public class MeterProcessService implements IMeterProcessService {
private List<MeterBuilder> meterBuilders;
private final ModuleManager manager;
private final List<MeterBuilder> meterBuilders;
private volatile boolean started = false;
public MeterProcessService(ModuleManager manager) {
this.manager = manager;
}
public MeterProcessContext(List<MeterConfig> meterBuilders, ModuleManager manager) {
public void start(List<MeterConfig> meterBuilders) {
final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
this.meterBuilders = meterBuilders.stream().map(c -> new MeterBuilder(c, meterSystem)).collect(Collectors.toList());
this.meterBuilders = meterBuilders.stream()
.map(c -> new MeterBuilder(c, meterSystem))
.collect(Collectors.toList());
}
/**
......@@ -48,14 +56,17 @@ public class MeterProcessContext {
/**
* Init all meters.
*/
public void initMeters() {
meterBuilders.stream().forEach(MeterBuilder::initMeter);
public synchronized void initMeters() {
if (!started) {
meterBuilders.stream().forEach(MeterBuilder::initMeter);
started = true;
}
}
/**
* Getting enabled builders.
*/
List<MeterBuilder> enabledBuilders() {
public List<MeterBuilder> enabledBuilders() {
return meterBuilders.stream().filter(MeterBuilder::hasInit).collect(Collectors.toList());
}
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import groovy.lang.Binding;
import groovy.lang.GroovyShell;
......@@ -41,7 +41,7 @@ public class MeterProcessor {
/**
* Process context.
*/
private final MeterProcessContext context;
private final MeterProcessService processService;
/**
* All of meters has been read. Using it to process groovy script.
......@@ -63,8 +63,8 @@ public class MeterProcessor {
*/
private Long timestamp;
public MeterProcessor(MeterProcessContext context) {
this.context = context;
public MeterProcessor(MeterProcessService processService) {
this.processService = processService;
}
public void read(MeterData data) {
......@@ -107,7 +107,7 @@ public class MeterProcessor {
}
// Get all meter builders.
final List<MeterBuilder> enabledBuilders = context.enabledBuilders();
final List<MeterBuilder> enabledBuilders = processService.enabledBuilders();
if (CollectionUtils.isEmpty(enabledBuilders)) {
return;
}
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
......
......@@ -16,21 +16,22 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
package org.apache.skywalking.oap.server.analyzer.provider.trace;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
public class DBLatencyThresholdsAndWatcher extends ConfigChangeWatcher {
private AtomicReference<Map<String, Integer>> thresholds;
private AtomicReference<String> settingsString;
public DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) {
super(TraceModule.NAME, provider, "slowDBAccessThreshold");
public DBLatencyThresholdsAndWatcher(String config, ModuleProvider provider) {
super(AnalyzerModule.NAME, provider, "slowDBAccessThreshold");
thresholds = new AtomicReference<>(new HashMap<>());
settingsString = new AtomicReference<>(Const.EMPTY_STRING);
......
......@@ -16,11 +16,13 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
package org.apache.skywalking.oap.server.analyzer.provider.trace;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import java.util.concurrent.atomic.AtomicReference;
......@@ -28,8 +30,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class TraceSampleRateWatcher extends ConfigChangeWatcher {
private AtomicReference<Integer> sampleRate;
public TraceSampleRateWatcher(TraceModuleProvider provider) {
super(TraceModule.NAME, provider, "sampleRate");
public TraceSampleRateWatcher(ModuleProvider provider) {
super(AnalyzerModule.NAME, provider, "sampleRate");
sampleRate = new AtomicReference<>();
sampleRate.set(getDefaultValue());
}
......@@ -60,7 +62,7 @@ public class TraceSampleRateWatcher extends ConfigChangeWatcher {
}
private int getDefaultValue() {
return ((TraceModuleProvider) this.getProvider()).getModuleConfig().getSampleRate();
return ((AnalyzerModuleConfig) this.getProvider().createConfigBeanIfAbsent()).getSampleRate();
}
public int getSampleRate() {
......
......@@ -15,16 +15,17 @@
* limitations under the License.
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
package org.apache.skywalking.oap.server.analyzer.provider.trace;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.yaml.snakeyaml.Yaml;
import java.io.FileNotFoundException;
......@@ -47,8 +48,8 @@ public class UninstrumentedGatewaysConfig extends ConfigChangeWatcher {
private final AtomicReference<String> settingsString;
private volatile Map<String, GatewayInstanceInfo> gatewayInstanceKeyedByAddress = Collections.emptyMap();
UninstrumentedGatewaysConfig(TraceModuleProvider provider) {
super(TraceModule.NAME, provider, "uninstrumentedGateways");
public UninstrumentedGatewaysConfig(ModuleProvider provider) {
super(AnalyzerModule.NAME, provider, "uninstrumentedGateways");
this.settingsString = new AtomicReference<>(Const.EMPTY_STRING);
final GatewayInfos defaultGateways = parseGatewaysFromFile("gateways.yml");
log.info("Default configured gateways: {}", defaultGateways);
......
......@@ -16,9 +16,9 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.AnalysisListenerFactory;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.AnalysisListenerFactory;
public interface ISegmentParserListenerManager {
void add(AnalysisListenerFactory analysisListenerFactory);
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.oap.server.library.module.Service;
......
......@@ -16,11 +16,12 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.AnalysisListenerFactory;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.AnalysisListenerFactory;
public class SegmentParserListenerManager implements ISegmentParserListenerManager {
......
......@@ -16,13 +16,13 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
/**
* The open service to the receivers.
......@@ -30,7 +30,7 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModu
@RequiredArgsConstructor
public class SegmentParserServiceImpl implements ISegmentParserService {
private final ModuleManager moduleManager;
private final TraceServiceModuleConfig config;
private final AnalyzerModuleConfig config;
@Setter
private SegmentParserListenerManager listenerManager;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser;
/**
* Reserved keys of the span. The backend analysis the metrics according the existed tags.
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser;
import java.util.ArrayList;
import java.util.List;
......@@ -26,20 +26,20 @@ import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.AnalysisListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.EntryAnalysisListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.ExitAnalysisListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.FirstAnalysisListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.LocalAnalysisListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SegmentListener;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.AnalysisListener;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.EntryAnalysisListener;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.ExitAnalysisListener;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.FirstAnalysisListener;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.LocalAnalysisListener;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.SegmentListener;
@Slf4j
@RequiredArgsConstructor
public class TraceAnalyzer {
private final ModuleManager moduleManager;
private final SegmentParserListenerManager listenerManager;
private final TraceServiceModuleConfig config;
private final AnalyzerModuleConfig config;
private List<AnalysisListener> analysisListeners = new ArrayList<>();
public void doAnalysis(SegmentObject segmentObject) {
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
/**
* AnalysisListener represents the callback when OAP does the trace segment analysis.
......
......@@ -16,15 +16,15 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
/**
* AnalysisListenerFactory implementation creates the listener instances when required. Every AnalysisListener could
* have its own creation factory.
*/
public interface AnalysisListenerFactory {
AnalysisListener create(ModuleManager moduleManager, TraceServiceModuleConfig config);
AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config);
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
......@@ -31,6 +31,7 @@ import org.apache.skywalking.apm.network.language.agent.v3.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.provider.trace.DBLatencyThresholdsAndWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
......@@ -46,11 +47,10 @@ import org.apache.skywalking.oap.server.core.source.RequestType;
import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.DBLatencyThresholdsAndWatcher;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SpanTags;
import static org.apache.skywalking.oap.server.receiver.trace.provider.parser.SpanTags.LOGIC_ENDPOINT;
import static org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SpanTags.LOGIC_ENDPOINT;
/**
* MultiScopesSpanListener includes the most segment to source(s) logic.
......@@ -66,7 +66,7 @@ public class MultiScopesAnalysisListener implements EntryAnalysisListener, ExitA
private final List<SourceBuilder> logicEndpointBuilders = new ArrayList<>(10);
private final Gson gson = new Gson();
private final SourceReceiver sourceReceiver;
private final TraceServiceModuleConfig config;
private final AnalyzerModuleConfig config;
private final NetworkAddressAliasCache networkAddressAliasCache;
private final NamingControl namingControl;
......@@ -371,7 +371,7 @@ public class MultiScopesAnalysisListener implements EntryAnalysisListener, ExitA
}
@Override
public AnalysisListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) {
return new MultiScopesAnalysisListener(
sourceReceiver, config, networkAddressAliasCache, namingControl);
}
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -31,7 +31,7 @@ import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.NetworkAddressAliasSetup;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
/**
* NetworkAddressAliasMappingListener use the propagated data in the segment reference, set up the alias relationship
......@@ -45,7 +45,7 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModu
@RequiredArgsConstructor
public class NetworkAddressAliasMappingListener implements EntryAnalysisListener {
private final SourceReceiver sourceReceiver;
private final TraceServiceModuleConfig config;
private final AnalyzerModuleConfig config;
private final NamingControl namingControl;
@Override
......@@ -107,7 +107,7 @@ public class NetworkAddressAliasMappingListener implements EntryAnalysisListener
}
@Override
public AnalysisListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) {
return new NetworkAddressAliasMappingListener(sourceReceiver, config, namingControl);
}
}
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -33,7 +33,7 @@ import org.apache.skywalking.oap.server.core.source.Segment;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceServiceModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
/**
* SegmentSpanListener forwards the segment raw data to the persistence layer with the query required conditions.
......@@ -174,7 +174,7 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
private final TraceSegmentSampler sampler;
private final NamingControl namingControl;
public Factory(ModuleManager moduleManager, TraceServiceModuleConfig config) {
public Factory(ModuleManager moduleManager, AnalyzerModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.sampler = new TraceSegmentSampler(config.getTraceSampleRateWatcher());
this.namingControl = moduleManager.find(CoreModule.NAME)
......@@ -183,7 +183,7 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
}
@Override
public AnalysisListener create(ModuleManager moduleManager, TraceServiceModuleConfig config) {
public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) {
return new SegmentAnalysisListener(sourceReceiver, sampler, namingControl);
}
}
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
......
......@@ -16,9 +16,9 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceSampleRateWatcher;
import org.apache.skywalking.oap.server.analyzer.provider.trace.TraceSampleRateWatcher;
/**
* The sampler makes the sampling mechanism works at backend side. Sample result: [0,sampleRate) sampled, (sampleRate,~)
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.analyzer.module.AnalyzerModule
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider
\ No newline at end of file
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import org.apache.skywalking.apm.network.language.agent.v3.Label;
import org.apache.skywalking.apm.network.language.agent.v3.MeterBucketValue;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import org.junit.Assert;
import org.junit.Test;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import io.vavr.Function2;
import org.apache.skywalking.apm.network.language.agent.v3.Label;
......
......@@ -16,17 +16,16 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import io.vavr.Function2;
import java.util.Arrays;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import java.util.Arrays;
import java.util.List;
public class EvalMultipleDataTest extends EvalDataBaseTest {
private EvalMultipleData singleMultiple;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import io.vavr.Function2;
import org.junit.Assert;
......
......@@ -16,14 +16,18 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import org.apache.skywalking.apm.network.language.agent.v3.Label;
import org.apache.skywalking.apm.network.language.agent.v3.MeterBucketValue;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterHistogram;
import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfigs;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
......@@ -32,20 +36,15 @@ import org.apache.skywalking.oap.server.core.analysis.meter.function.AvgFunction
import org.apache.skywalking.oap.server.core.analysis.meter.function.AvgHistogramFunction;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AvgHistogramPercentileFunction;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.meter.provider.MeterReceiverConfig;
import org.apache.skywalking.oap.server.receiver.meter.provider.config.MeterConfig;
import org.apache.skywalking.oap.server.receiver.meter.provider.config.MeterConfigs;
import org.junit.Before;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import java.util.HashMap;
import java.util.List;
import static org.mockito.Mockito.when;
public abstract class MeterBaseTest {
private static final String CONFIG_PATH = "meter-receive-config";
@Mock
protected CoreModuleProvider moduleProvider;
......@@ -81,11 +80,12 @@ public abstract class MeterBaseTest {
Whitebox.setInternalState(meterSystem, "functionRegister", map);
// load context
List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(new MeterReceiverConfig().getConfigPath());
final MeterProcessContext context = new MeterProcessContext(meterConfigs, moduleManager);
List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(CONFIG_PATH);
final MeterProcessService service = new MeterProcessService(moduleManager);
service.start(meterConfigs);
// create process and read meters
processor = context.createProcessor();
processor = service.createProcessor();
timestamp = System.currentTimeMillis();
// single value
......
......@@ -16,8 +16,10 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.meter.function.AcceptableValue;
......@@ -33,9 +35,6 @@ import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
......@@ -51,7 +50,8 @@ public class MeterBuilderTest extends MeterBaseTest {
values.add(invocationOnMock.getArgumentAt(0, AcceptableValue.class));
return null;
}).when(meterSystem).doStreamingCalculation(any());
final MeterProcessContext context = (MeterProcessContext) Whitebox.getInternalState(processor, "context");
final MeterProcessService context = (MeterProcessService) Whitebox.getInternalState(processor, "processService");
context.enabledBuilders().stream().peek(b -> doCallRealMethod().when(b).buildAndSend(any(), any()));
context.initMeters();
......
......@@ -16,11 +16,10 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
import org.junit.Test;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import java.util.function.Consumer;
import org.junit.Test;
public class MeterEvalOperationTest {
......
......@@ -16,12 +16,12 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import java.util.List;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfigs;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.receiver.meter.provider.MeterReceiverConfig;
import org.apache.skywalking.oap.server.receiver.meter.provider.config.MeterConfig;
import org.apache.skywalking.oap.server.receiver.meter.provider.config.MeterConfigs;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -29,8 +29,6 @@ import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
......@@ -38,23 +36,26 @@ import static org.mockito.Mockito.verify;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.management.*")
public class MeterProcessContextTest extends MeterBaseTest {
private static final String CONFIG_PATH = "meter-receive-config";
@Test
public void testInitMeter() throws ModuleStartException {
List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(new MeterReceiverConfig().getConfigPath());
final MeterProcessContext context = new MeterProcessContext(meterConfigs, moduleManager);
List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(CONFIG_PATH);
final MeterProcessService service = new MeterProcessService(moduleManager);
service.start(meterConfigs);
context.initMeters();
service.initMeters();
verify(meterSystem, times(3)).create(any(), any(), any());
Assert.assertEquals(3, context.enabledBuilders().size());
Assert.assertEquals(3, service.enabledBuilders().size());
}
@Test
public void testCreateNewProcessor() throws ModuleStartException {
List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(new MeterReceiverConfig().getConfigPath());
final MeterProcessContext context = new MeterProcessContext(meterConfigs, moduleManager);
List<MeterConfig> meterConfigs = MeterConfigs.loadConfig(CONFIG_PATH);
final MeterProcessService service = new MeterProcessService(moduleManager);
service.start(meterConfigs);
final MeterProcessor processor = context.createProcessor();
Assert.assertEquals(context, Whitebox.getInternalState(processor, "context"));
final MeterProcessor processor = service.createProcessor();
Assert.assertEquals(service, Whitebox.getInternalState(processor, "processService"));
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.receiver.meter.provider.process;
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import org.junit.Assert;
import org.junit.Test;
......@@ -67,7 +67,7 @@ public class MeterProcessorTest extends MeterBaseTest {
@Test
public void testProcess() {
// each builder has build and send
MeterProcessContext context = (MeterProcessContext) Whitebox.getInternalState(processor, "context");
MeterProcessService context = (MeterProcessService) Whitebox.getInternalState(processor, "processService");
List<MeterBuilder> builders = context.enabledBuilders().stream().map(Mockito::spy)
.peek(builder -> doNothing().when(builder).buildAndSend(any(), any())).collect(Collectors.toList());
Whitebox.setInternalState(context, "meterBuilders", builders);
......
......@@ -16,8 +16,11 @@
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
package org.apache.skywalking.oap.server.analyzer.provider.trace;
import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
......@@ -27,24 +30,21 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.Optional;
import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class TraceSampleRateWatcherTest {
private TraceModuleProvider traceModuleProvider;
private AnalyzerModuleProvider provider;
@Before
public void init() {
traceModuleProvider = new TraceModuleProvider();
provider = new AnalyzerModuleProvider();
}
@Test
public void testInit() {
TraceSampleRateWatcher traceSampleRateWatcher = new TraceSampleRateWatcher(traceModuleProvider);
TraceSampleRateWatcher traceSampleRateWatcher = new TraceSampleRateWatcher(provider);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 10000);
Assert.assertEquals(traceSampleRateWatcher.value(), "10000");
}
......@@ -53,7 +53,7 @@ public class TraceSampleRateWatcherTest {
public void testDynamicUpdate() throws InterruptedException {
ConfigWatcherRegister register = new MockConfigWatcherRegister(3);
TraceSampleRateWatcher watcher = new TraceSampleRateWatcher(traceModuleProvider);
TraceSampleRateWatcher watcher = new TraceSampleRateWatcher(provider);
register.registerConfigChangeWatcher(watcher);
register.start();
......@@ -61,31 +61,35 @@ public class TraceSampleRateWatcherTest {
Thread.sleep(2000);
}
assertThat(watcher.getSampleRate(), is(9000));
assertThat(traceModuleProvider.getModuleConfig().getSampleRate(), is(10000));
assertThat(provider.getModuleConfig().getSampleRate(), is(10000));
}
@Test
public void testNotify() {
TraceSampleRateWatcher traceSampleRateWatcher = new TraceSampleRateWatcher(traceModuleProvider);
ConfigChangeWatcher.ConfigChangeEvent value1 = new ConfigChangeWatcher.ConfigChangeEvent("8000", ConfigChangeWatcher.EventType.MODIFY);
TraceSampleRateWatcher traceSampleRateWatcher = new TraceSampleRateWatcher(provider);
ConfigChangeWatcher.ConfigChangeEvent value1 = new ConfigChangeWatcher.ConfigChangeEvent(
"8000", ConfigChangeWatcher.EventType.MODIFY);
traceSampleRateWatcher.notify(value1);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 8000);
Assert.assertEquals(traceSampleRateWatcher.value(), "8000");
ConfigChangeWatcher.ConfigChangeEvent value2 = new ConfigChangeWatcher.ConfigChangeEvent("8000", ConfigChangeWatcher.EventType.DELETE);
ConfigChangeWatcher.ConfigChangeEvent value2 = new ConfigChangeWatcher.ConfigChangeEvent(
"8000", ConfigChangeWatcher.EventType.DELETE);
traceSampleRateWatcher.notify(value2);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 10000);
Assert.assertEquals(traceSampleRateWatcher.value(), "10000");
ConfigChangeWatcher.ConfigChangeEvent value3 = new ConfigChangeWatcher.ConfigChangeEvent("500", ConfigChangeWatcher.EventType.ADD);
ConfigChangeWatcher.ConfigChangeEvent value3 = new ConfigChangeWatcher.ConfigChangeEvent(
"500", ConfigChangeWatcher.EventType.ADD);
traceSampleRateWatcher.notify(value3);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 500);
Assert.assertEquals(traceSampleRateWatcher.value(), "500");
ConfigChangeWatcher.ConfigChangeEvent value4 = new ConfigChangeWatcher.ConfigChangeEvent("abc", ConfigChangeWatcher.EventType.MODIFY);
ConfigChangeWatcher.ConfigChangeEvent value4 = new ConfigChangeWatcher.ConfigChangeEvent(
"abc", ConfigChangeWatcher.EventType.MODIFY);
traceSampleRateWatcher.notify(value4);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 500);
......@@ -101,7 +105,7 @@ public class TraceSampleRateWatcherTest {
@Override
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable table = new ConfigTable();
table.add(new ConfigTable.ConfigItem("receiver-trace.default.sampleRate", "9000"));
table.add(new ConfigTable.ConfigItem("agent-analyzer.default.sampleRate", "9000"));
return Optional.of(table);
}
}
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
meters:
- name: build_test1
scope:
type: SERVICE
meter:
operation: avg
value: meter["test_count1"].tagFilter("k1", "v1").scale(2)
- name: build_test2
scope:
type: SERVICE_INSTANCE
meter:
operation: avgHistogram
value: meter["test_histogram"]
- name: build_test3
scope:
type: ENDPOINT
endpoint: test_endpoint
meter:
operation: avgHistogramPercentile
value: meter["test_histogram"]
percentile:
- 50
- 90
- 99
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>analyzer</artifactId>
<packaging>pom</packaging>
<modules>
<module>agent-analyzer</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-module</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>library-util</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -29,6 +29,7 @@
<packaging>pom</packaging>
<modules>
<module>server-core</module>
<module>analyzer</module>
<module>server-receiver-plugin</module>
<module>server-cluster-plugin</module>
<module>server-storage-plugin</module>
......@@ -95,7 +96,8 @@
<zookeeper.image.version>3.5</zookeeper.image.version>
<protobuf-java-util.version>3.11.4</protobuf-java-util.version>
<kafka-clients.version>2.4.1</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
</properties>
<dependencies>
......
......@@ -134,6 +134,11 @@
<artifactId>prometheus-fetcher-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>kafka-fetcher-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- fetcher module -->
<!-- storage module -->
......
......@@ -158,6 +158,12 @@ storage:
duration: ${SW_STORAGE_INFLUXDB_DURATION:1000} # the time to wait at most (milliseconds)
fetchTaskLogMaxSize: ${SW_STORAGE_INFLUXDB_FETCH_TASK_LOG_MAX_SIZE:5000} # the max number of fetch task log in a request
agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
default:
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-sharing-server:
selector: ${SW_RECEIVER_SHARING_SERVER:default}
default:
......@@ -176,8 +182,6 @@ receiver-register:
receiver-trace:
selector: ${SW_RECEIVER_TRACE:default}
default:
sampleRate: ${SW_TRACE_SAMPLE_RATE:10000} # The sample rate precision is 1/10000. 10000 means 100% sample in default.
slowDBAccessThreshold: ${SW_SLOW_DB_THRESHOLD:default:200,mongodb:100} # The slow database access thresholds. Unit ms.
receiver-jvm:
selector: ${SW_RECEIVER_JVM:default}
......@@ -210,10 +214,20 @@ prometheus-fetcher:
default:
active: ${SW_PROMETHEUS_FETCHER_ACTIVE:false}
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:-}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
receiver-meter:
selector: ${SW_RECEIVER_METER:-}
default:
receiver-oc:
selector: ${SW_OC_RECEIVER:-}
default:
......
......@@ -44,7 +44,7 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
private volatile Address selfAddress;
ZookeeperCoordinator(ClusterModuleZookeeperConfig config,
ServiceDiscovery<RemoteInstance> serviceDiscovery) throws Exception {
ServiceDiscovery<RemoteInstance> serviceDiscovery) throws Exception {
this.config = config;
this.serviceDiscovery = serviceDiscovery;
this.serviceCache = serviceDiscovery.serviceCacheBuilder().name(REMOTE_NAME_PATH).build();
......@@ -62,11 +62,11 @@ public class ZookeeperCoordinator implements ClusterRegister, ClusterNodesQuery
.id(UUID.randomUUID()
.toString())
.address(remoteInstance
.getAddress()
.getHost())
.getAddress()
.getHost())
.port(remoteInstance
.getAddress()
.getPort())
.getAddress()
.getPort())
.payload(remoteInstance)
.build();
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-fetcher-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-fetcher-plugin</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>agent-analyzer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka-test.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.agent.kafka;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
/**
* Configuring and initializing a KafkaConsumer client as a dispatcher to delivery Kafka Message to registered handler by topic.
*/
@Slf4j
public class KafkaFetcherHandlerRegister implements Runnable {
private ImmutableMap.Builder<String, KafkaHandler> builder = ImmutableMap.builder();
private ImmutableMap<String, KafkaHandler> handlerMap;
private List<TopicPartition> topicPartitions = Lists.newArrayList();
private KafkaConsumer<String, Bytes> consumer = null;
private final KafkaFetcherConfig config;
private final boolean isSharding;
public KafkaFetcherHandlerRegister(KafkaFetcherConfig config) throws ModuleStartException {
this.config = config;
Properties properties = new Properties(config.getKafkaConsumerConfig());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
AdminClient adminClient = AdminClient.create(properties);
Set<String> missedTopics = adminClient.describeTopics(Lists.newArrayList(
config.getTopicNameOfManagements(),
config.getTopicNameOfMetrics(),
config.getTopicNameOfProfiling(),
config.getTopicNameOfTracingSegments(),
config.getTopicNameOfMeters()
))
.values()
.entrySet()
.stream()
.map(entry -> {
try {
entry.getValue().get();
return null;
} catch (InterruptedException | ExecutionException e) {
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!missedTopics.isEmpty()) {
log.info("Topics" + missedTopics.toString() + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new NewTopic(
topic,
config.getPartitions(),
(short) config.getReplicationFactor()
)).collect(Collectors.toList());
try {
adminClient.createTopics(newTopicList).all().get();
} catch (Exception e) {
throw new ModuleStartException("Failed to create Kafka Topics" + missedTopics + ".", e);
}
}
if (config.isSharding() && StringUtil.isNotEmpty(config.getConsumePartitions())) {
isSharding = true;
} else {
isSharding = false;
}
consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer());
}
public void register(KafkaHandler handler) {
builder.put(handler.getTopic(), handler);
topicPartitions.addAll(handler.getTopicPartitions());
}
public void start() {
handlerMap = builder.build();
if (isSharding) {
consumer.assign(topicPartitions);
} else {
consumer.subscribe(handlerMap.keySet());
}
consumer.seekToEnd(consumer.assignment());
Executors.newSingleThreadExecutor(new DefaultThreadFactory("KafkaConsumer")).submit(this);
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, Bytes>> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, Bytes> record = iterator.next();
handlerMap.get(record.topic()).handle(record);
}
consumer.commitAsync();
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.agent.kafka.module;
import java.util.Properties;
import lombok.Data;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
@Data
public class KafkaFetcherConfig extends ModuleConfig {
/**
* Kafka consumer config.
*/
private Properties kafkaConsumerConfig = new Properties();
/**
* <B>bootstrap.servers</B>: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
* A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
*/
private String bootstrapServers;
/**
* <B>group.id</B>: A unique string that identifies the consumer group this consumer belongs to.
*/
private String groupId = "skywalking-consumer";
/**
* Which PartitionId(s) of the topics assign to the OAP server. If more than one, is separated by commas.
*/
private String consumePartitions = "";
/**
* isSharding was true when OAP Server in cluster.
*/
private boolean isSharding = false;
/**
* If true, create the Kafka topic when it does not exist.
*/
private boolean createTopicIfNotExist = true;
/**
* The number of partitions for the topic being created.
*/
private int partitions = 3;
/**
* The replication factor for each partition in the topic being created.
*/
private int replicationFactor = 2;
private boolean enableMeterSystem = false;
private String configPath = "meter-receive-config";
private String topicNameOfMetrics = "skywalking-metrics";
private String topicNameOfProfiling = "skywalking-profilings";
private String topicNameOfTracingSegments = "skywalking-segments";
private String topicNameOfManagements = "skywalking-managements";
private String topicNameOfMeters = "skywalking-meters";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.agent.kafka.module;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class KafkaFetcherModule extends ModuleDefine {
public static final String NAME = "kafka-fetcher";
public KafkaFetcherModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[] { };
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册