未验证 提交 828e6e2f 编写于 作者: P pg.yang 提交者: GitHub

Add virtual MQ analysis for native traces (#9855)

上级 93db4612
......@@ -584,9 +584,10 @@ jobs:
config: test/e2e-v2/cases/zipkin/mysql/sharding/e2e.yaml
- name: APISIX metrics
config: test/e2e-v2/cases/apisix/otel-collector/e2e.yaml
- name: Exporter Kafka
config: test/e2e-v2/cases/exporter/kafka/e2e.yaml
- name: Virtual MQ
config: test/e2e-v2/cases/virtual-mq/e2e.yaml
steps:
- uses: actions/checkout@v3
with:
......
......@@ -78,6 +78,7 @@
original `Service,ServiceInstance,ServiceRelation,ServiceInstanceRelation`.
* [**Breaking Change**] TCP-related source names are changed, fields of TCP-related sources are changed, please refer to the latest `oal/tcp.oal` file.
* Do not log error logs when failed to create ElasticSearch index because the index is created already.
* Add virtual MQ analysis for native traces.
* Support Python runtime metrics analysis.
* Support `sampledTrace` in LAL.
* Support multiple rules with different names under the same layer of LAL script.
......@@ -116,6 +117,8 @@
* Fix configuration panel styles.
* Remove a un-use icon.
* Support labeled value on the service/instance/endpoint list widgets.
* Add menu for virtual MQ
* Set selector props and update configuration panel styles
* Add Python runtime metrics and cpu/memory utilization panels to General-Instance and Fass-Instance dashboards
#### Documentation
......
......@@ -347,7 +347,7 @@ This calculates the metrics data from [events](event.md).
### SCOPE `DatabaseAccess`
This calculates the metrics data from each request of cache system.
This calculates the metrics data from each request of database.
| Name | Remarks | Group Key | Type |
|---|---|---|---|
......@@ -358,7 +358,7 @@ This calculates the metrics data from each request of cache system.
### SCOPE `DatabaseSlowStatement`
This calculates the metrics data from slow request of cache system , which is used for `write` or `read` operation.
This calculates the metrics data from slow request of database.
| Name | Remarks | Group Key | Type |
|---|---|---|---|
......@@ -394,3 +394,26 @@ This calculates the metrics data from slow request of cache system , which is us
| traceId | The traceId of this slow access| | string|
| status | Indicates the success or failure of the request.| | boolean |
| operation | Indicates this access is used for `write` or `read` | | string |
### SCOPE `MQAccess`
This calculates the service dimensional metrics data from each request of MQ system on consume/produce side
| Name | Remarks | Group Key | Type |
|---|---|---|---|
| name | The service name , usually it's MQ address(es) | |string|
| transmissionLatency | The latency from produce side to consume side . | | int(in ms)|
| status | Indicates the success or failure of the request.| | boolean |
| operation | Indicates this access is on `Produce` or `Consume` side | | enum |
### SCOPE `MQEndpointAccess`
This calculates the endpoint dimensional metrics data from each request of MQ system on consume/produce side
| Name | Remarks | Group Key | Type |
|---|---|---|---|
| serviceName | The service name that this endpoint belongs to. | | string |
| endpoint | The endpoint name , usually it's combined by `queue`,`topic` | | string |
| transmissionLatency | The latency from produce side to consume side . | | int(in ms)|
| status | Indicates the success or failure of the request.| | boolean |
| operation | Indicates this access is on `Produce` or `Consume` side | | enum |
# Virtual Message Queue (MQ)
Virtual MQ represent the MQ nodes detected by [server agents' plugins](server-agents.md). The performance
metrics of the MQ are also from the MQ client-side perspective.
For example, Kafka plugins in the Java agent could detect the transmission latency of message
As a result, SkyWalking would show message count, transmission latency, success rate powered by backend analysis capabilities in this dashboard.
The MQ operation span should have
- It is an **Exit**(at producer side) or **Entry**(at consumer side) span
- **Span's layer == MQ**
- Tag key = `mq.queue`, value = MQ queue name
- Tag key = `mq.topic`, value = MQ queue topic , it's optional as some MQ don't have topic concept.
- Tag key = `transmission.latency`, value = Transmission latency from consumer to producer
\ No newline at end of file
......@@ -153,6 +153,8 @@ catalog:
path: "/en/setup/service-agent/virtual-database"
- name: "Virtual Cache"
path: "/en/setup/service-agent/virtual-cache"
- name: "Virtual MQ"
path: "/en/setup/service-agent/virtual-mq"
- name: "Service Mesh"
catalog:
- name: "Observe Service Mesh"
......
......@@ -45,6 +45,12 @@ public class SpanTags {
public static final String CACHE_KEY = "cache.key";
public static final String MQ_QUEUE = "mq.queue";
public static final String MQ_TOPIC = "mq.topic";
public static final String TRANSMISSION_LATENCY = "transmission.latency";
/**
* Tag, x-le(extension logic endpoint) series tag. Value is JSON format.
* <pre>
......
......@@ -18,27 +18,27 @@
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener;
import java.util.Arrays;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleConfig;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.vservice.VirtualCacheProcessor;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.vservice.VirtualDatabaseProcessor;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.vservice.VirtualMQProcessor;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.vservice.VirtualServiceProcessor;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import java.util.Arrays;
import java.util.List;
/**
* Virtual Service represent remote service
*/
@RequiredArgsConstructor
public class VirtualServiceAnalysisListener implements ExitAnalysisListener, LocalAnalysisListener {
public class VirtualServiceAnalysisListener implements ExitAnalysisListener, LocalAnalysisListener, EntryAnalysisListener {
private final SourceReceiver sourceReceiver;
private final List<VirtualServiceProcessor> virtualServiceProcessors;
......@@ -50,7 +50,7 @@ public class VirtualServiceAnalysisListener implements ExitAnalysisListener, Loc
@Override
public boolean containsPoint(Point point) {
return point == Point.Local || point == Point.Exit;
return point == Point.Local || point == Point.Exit || point == Point.Entry;
}
@Override
......@@ -63,6 +63,11 @@ public class VirtualServiceAnalysisListener implements ExitAnalysisListener, Loc
virtualServiceProcessors.forEach(p -> p.prepareVSIfNecessary(span, segmentObject));
}
@Override
public void parseEntry(final SpanObject span, final SegmentObject segmentObject) {
virtualServiceProcessors.forEach(p -> p.prepareVSIfNecessary(span, segmentObject));
}
public static class Factory implements AnalysisListenerFactory {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
......@@ -76,11 +81,13 @@ public class VirtualServiceAnalysisListener implements ExitAnalysisListener, Loc
@Override
public AnalysisListener create(ModuleManager moduleManager, AnalyzerModuleConfig config) {
return new VirtualServiceAnalysisListener(sourceReceiver,
Arrays.asList(
new VirtualCacheProcessor(namingControl, config),
new VirtualDatabaseProcessor(namingControl, config)
)
return new VirtualServiceAnalysisListener(
sourceReceiver,
Arrays.asList(
new VirtualCacheProcessor(namingControl, config),
new VirtualDatabaseProcessor(namingControl, config),
new VirtualMQProcessor(namingControl)
)
);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.vservice;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentReference;
import org.apache.skywalking.apm.network.language.agent.v3.SpanLayer;
import org.apache.skywalking.apm.network.language.agent.v3.SpanObject;
import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.SpanTags;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.EndpointMeta;
import org.apache.skywalking.oap.server.core.source.MQAccess;
import org.apache.skywalking.oap.server.core.source.MQEndpointAccess;
import org.apache.skywalking.oap.server.core.source.MQOperation;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.Source;
import org.apache.skywalking.oap.server.library.util.StringUtil;
@RequiredArgsConstructor
public class VirtualMQProcessor implements VirtualServiceProcessor {
private final NamingControl namingControl;
private final List<Source> sourceList = new ArrayList<>();
@Override
public void prepareVSIfNecessary(final SpanObject span, final SegmentObject segmentObject) {
if (span.getSpanLayer() != SpanLayer.MQ) {
return;
}
if (!(span.getSpanType() == SpanType.Exit || span.getSpanType() == SpanType.Entry)) {
return;
}
MQTags mqTags = collectTags(span.getTagsList());
final MQOperation mqOperation;
final String serviceName;
if (span.getSpanType() == SpanType.Entry) {
mqOperation = MQOperation.Consume;
final String peer = span.getRefsList()
.stream()
.findFirst()
.map(SegmentReference::getNetworkAddressUsedAtPeer)
.orElse(null);
serviceName = namingControl.formatServiceName(peer);
} else {
mqOperation = MQOperation.Produce;
serviceName = namingControl.formatServiceName(span.getPeer());
}
long timeBucket = TimeBucket.getMinuteTimeBucket(span.getStartTime());
sourceList.add(toServiceMeta(serviceName, timeBucket));
String endpoint = buildEndpointName(mqTags.topic, mqTags.queue);
String endpointName = namingControl.formatEndpointName(serviceName, endpoint);
sourceList.add(toEndpointMeta(serviceName, endpointName, timeBucket));
MQAccess access = new MQAccess();
access.setTypeId(span.getComponentId());
access.setTransmissionLatency(mqTags.transmissionLatency);
access.setName(serviceName);
access.setStatus(!span.getIsError());
access.setTimeBucket(timeBucket);
access.setOperation(mqOperation);
sourceList.add(access);
MQEndpointAccess endpointAccess = new MQEndpointAccess();
endpointAccess.setTypeId(span.getComponentId());
endpointAccess.setTransmissionLatency(mqTags.transmissionLatency);
endpointAccess.setStatus(!span.getIsError());
endpointAccess.setTimeBucket(timeBucket);
endpointAccess.setOperation(mqOperation);
endpointAccess.setServiceName(serviceName);
endpointAccess.setEndpoint(endpointName);
sourceList.add(endpointAccess);
}
private String buildEndpointName(String topic, String queue) {
return Stream.of(topic, queue)
.filter(StringUtil::isNotBlank)
.reduce((a, b) -> a + "/" + b).orElse("");
}
private MQTags collectTags(final List<KeyStringValuePair> tagsList) {
MQTags mqTags = new MQTags();
for (KeyStringValuePair keyStringValuePair : tagsList) {
if (SpanTags.MQ_TOPIC.equals(keyStringValuePair.getKey())) {
mqTags.topic = keyStringValuePair.getValue();
} else if (SpanTags.MQ_QUEUE.equals(keyStringValuePair.getKey())) {
mqTags.queue = keyStringValuePair.getValue();
} else if (SpanTags.TRANSMISSION_LATENCY.equals(keyStringValuePair.getKey())) {
mqTags.transmissionLatency = StringUtil.isBlank(keyStringValuePair.getValue()) ? 0L : Long.parseLong(
keyStringValuePair.getValue());
}
}
return mqTags;
}
private ServiceMeta toServiceMeta(String serviceName, Long timeBucket) {
ServiceMeta service = new ServiceMeta();
service.setName(serviceName);
service.setLayer(Layer.VIRTUAL_MQ);
service.setTimeBucket(timeBucket);
return service;
}
private EndpointMeta toEndpointMeta(String serviceName, String endpoint, Long timeBucket) {
EndpointMeta endpointMeta = new EndpointMeta();
endpointMeta.setServiceName(serviceName);
endpointMeta.setServiceNormal(false);
endpointMeta.setEndpoint(endpoint);
endpointMeta.setTimeBucket(timeBucket);
return endpointMeta;
}
@Override
public void emitTo(final Consumer<Source> consumer) {
sourceList.forEach(consumer);
}
private static class MQTags {
private String topic;
private String queue;
private long transmissionLatency;
}
}
......@@ -50,6 +50,8 @@ SRC_SERVICE_INSTANCE_CLR_THREAD: 'ServiceInstanceCLRThread';
SRC_ENVOY_INSTANCE_METRIC: 'EnvoyInstanceMetric';
SRC_EVENT: 'Event';
SRC_CACHE_ACCESS: 'CacheAccess';
SRC_MQ_ACCESS: 'MQAccess';
SRC_MQ_ENDPOINT_ACCESS: 'MQEndpointAccess';
// Browser keywords
......
......@@ -57,7 +57,7 @@ source
SRC_ENVOY_INSTANCE_METRIC |
SRC_BROWSER_APP_PERF | SRC_BROWSER_APP_PAGE_PERF | SRC_BROWSER_APP_SINGLE_VERSION_PERF |
SRC_BROWSER_APP_TRAFFIC | SRC_BROWSER_APP_PAGE_TRAFFIC | SRC_BROWSER_APP_SINGLE_VERSION_TRAFFIC |
SRC_EVENT
SRC_EVENT | SRC_MQ_ACCESS | SRC_MQ_ENDPOINT_ACCESS
;
disableSource
......
......@@ -59,6 +59,7 @@ public class UITemplateInitializer {
Layer.SO11Y_SATELLITE.name(),
Layer.FAAS.name(),
Layer.APISIX.name(),
Layer.VIRTUAL_MQ.name(),
"custom"
};
private final UITemplateManagementService uiTemplateManagementService;
......
......@@ -36,9 +36,14 @@ public class CacheAccess extends Source {
@Override
public String getEntityId() {
return IDManager.ServiceID.buildId(name, false);
if (entityId == null) {
entityId = IDManager.ServiceID.buildId(name, false);
}
return entityId;
}
private String entityId;
@Getter
@Setter
@ScopeDefaultColumn.DefinedByField(columnName = "name", requireDynamicActive = true)
......
......@@ -36,9 +36,14 @@ public class DatabaseAccess extends Source {
@Override
public String getEntityId() {
return IDManager.ServiceID.buildId(name, false);
if (entityId == null) {
entityId = IDManager.ServiceID.buildId(name, false);
}
return entityId;
}
private String entityId;
@Getter
@Setter
@ScopeDefaultColumn.DefinedByField(columnName = "name", requireDynamicActive = true)
......
......@@ -116,6 +116,9 @@ public class DefaultScopeDefine {
public static final int TCP_SERVICE_INSTANCE_UPDATE = 61;
public static final int SAMPLED_SLOW_TRACE = 62;
public static final int MESSAGE_QUEUE_ACCESS = 63;
public static final int MESSAGE_QUEUE_ENDPOINT_ACCESS = 64;
/**
* Catalog of scope, the metrics processor could use this to group all generated metrics by oal rt.
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.MESSAGE_QUEUE_ACCESS;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SERVICE_CATALOG_NAME;
@ScopeDeclaration(id = MESSAGE_QUEUE_ACCESS, name = "MQAccess", catalog = SERVICE_CATALOG_NAME)
@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
public class MQAccess extends Source {
@Override
public int scope() {
return MESSAGE_QUEUE_ACCESS;
}
@Override
public String getEntityId() {
if (entityId == null) {
entityId = IDManager.ServiceID.buildId(name, false);
}
return entityId;
}
private String entityId;
@Getter
@Setter
@ScopeDefaultColumn.DefinedByField(columnName = "name", requireDynamicActive = true)
private String name;
@Getter
@Setter
private int typeId;
@Getter
@Setter
private long transmissionLatency;
@Getter
@Setter
private boolean status;
@Getter
@Setter
private MQOperation operation;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.ENDPOINT_CATALOG_NAME;
import static org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.MESSAGE_QUEUE_ENDPOINT_ACCESS;
@ScopeDeclaration(id = MESSAGE_QUEUE_ENDPOINT_ACCESS, name = "MQEndpointAccess", catalog = ENDPOINT_CATALOG_NAME)
@ScopeDefaultColumn.VirtualColumnDefinition(fieldName = "entityId", columnName = "entity_id", isID = true, type = String.class)
public class MQEndpointAccess extends Source {
@Override
public int scope() {
return MESSAGE_QUEUE_ENDPOINT_ACCESS;
}
@Override
public String getEntityId() {
if (entityId == null) {
entityId = IDManager.EndpointID.buildId(serviceId, endpoint);
}
return entityId;
}
private String entityId;
@Getter
@ScopeDefaultColumn.DefinedByField(columnName = "service_id")
private String serviceId;
@Getter
@Setter
@ScopeDefaultColumn.DefinedByField(columnName = "service_name", requireDynamicActive = true)
private String serviceName;
@Getter
@Setter
@ScopeDefaultColumn.DefinedByField(columnName = "endpoint", requireDynamicActive = true)
private String endpoint;
@Getter
@Setter
private int typeId;
@Getter
@Setter
private long transmissionLatency;
@Getter
@Setter
private boolean status;
@Getter
@Setter
private MQOperation operation;
@Override
public void prepare() {
this.serviceId = IDManager.ServiceID.buildId(serviceName, false);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.core.source;
public enum MQOperation {
Consume,
Produce
}
......@@ -55,7 +55,6 @@ endpoint_cpm = from(Endpoint.*).cpm();
endpoint_resp_time = from(Endpoint.latency).longAvg();
endpoint_sla = from(Endpoint.*).percent(status == true);
endpoint_percentile = from(Endpoint.latency).percentile(10); // Multiple values including p50, p75, p90, p95, p99
endpoint_mq_consume_count = from(Endpoint.*).filter(type == RequestType.MQ).count();
endpoint_mq_consume_latency = from((str->long)Endpoint.tag["transmission.latency"]).filter(type == RequestType.MQ).filter(tag["transmission.latency"] != null).longAvg();
// Endpoint relation scope metrics
......@@ -83,3 +82,17 @@ cache_access_resp_time = from(CacheAccess.latency).longAvg();
cache_access_sla = from(CacheAccess.*).percent(status == true);
cache_access_cpm = from(CacheAccess.*).cpm();
cache_access_percentile = from(CacheAccess.latency).percentile(10);
mq_service_consume_cpm = from(MQAccess.*).filter(operation == MQOperation.Consume).cpm();
mq_service_consume_sla = from(MQAccess.*).filter(operation == MQOperation.Consume).percent(status == true);
mq_service_consume_latency = from(MQAccess.transmissionLatency).filter(operation == MQOperation.Consume).longAvg();
mq_service_consume_percentile = from(MQAccess.transmissionLatency).filter(operation == MQOperation.Consume).percentile(10);
mq_service_produce_cpm = from(MQAccess.*).filter(operation == MQOperation.Produce).cpm();
mq_service_produce_sla = from(MQAccess.*).filter(operation == MQOperation.Produce).percent(status == true);
mq_endpoint_consume_cpm = from(MQEndpointAccess.*).filter(operation == MQOperation.Consume).cpm();
mq_endpoint_consume_latency = from(MQEndpointAccess.transmissionLatency).filter(operation == MQOperation.Consume).longAvg();
mq_endpoint_consume_percentile = from(MQEndpointAccess.transmissionLatency).filter(operation == MQOperation.Consume).percentile(10);
mq_endpoint_consume_sla = from(MQEndpointAccess.*).filter(operation == MQOperation.Consume).percent(status == true);
mq_endpoint_produce_cpm = from(MQEndpointAccess.*).filter(operation == MQOperation.Produce).cpm();
mq_endpoint_produce_sla = from(MQEndpointAccess.*).filter(operation == MQOperation.Produce).percent(status == true);
......@@ -128,7 +128,7 @@
]
},
{
"x": 16,
"x": 8,
"y": 13,
"w": 8,
"h": 13,
......@@ -173,52 +173,6 @@
}
]
},
{
"x": 8,
"y": 13,
"w": 8,
"h": 13,
"i": "6",
"type": "Widget",
"widget": {
"title": "Message Queue Consuming Count",
"tips": "The number of consumed messages.",
"name": "Message_Consuming_Count"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"endpoint_mq_consume_count"
],
"metricTypes": [
"readMetricsValues"
],
"value": "0-0-6",
"label": "Message_Consuming_Count",
"associate": [
{
"widgetId": "0-0-3"
},
{
"widgetId": "0-0-4"
},
{
"widgetId": "0-0-5"
},
{
"widgetId": "0-0-7"
},
{
"widgetId": "0-0-8"
}
]
},
{
"x": 0,
"y": 13,
......@@ -273,7 +227,8 @@
"type": "Widget",
"widget": {
"title": "Endpoint Load (calls / min)",
"name": "Endpoint_Load"
"name": "Endpoint_Load",
"tips": "requests(HTTP / RPC) / min, consuming messages(MQ) / min"
},
"graph": {
"type": "Line",
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
[
{
"id": "Virtual-MQ-Endpoint",
"configuration": {
"children": [
{
"x": 0,
"y": 0,
"w": 6,
"h": 13,
"i": "1",
"type": "Widget",
"widget": {
"title": "Consume Traffic (calls / min)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_endpoint_consume_cpm"
],
"metricTypes": [
"readMetricsValues"
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 6,
"y": 0,
"w": 6,
"h": 13,
"i": "2",
"type": "Widget",
"widget": {
"title": "Consume Access Successful Rate (%)",
"name": "Successful_Rate"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_endpoint_consume_sla"
],
"metricTypes": [
"readMetricsValues"
],
"metricConfig": [
{
"calculation": "percentage"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 12,
"y": 0,
"w": 6,
"h": 13,
"i": "3",
"type": "Widget",
"widget": {
"title": "Transmission Latency Percentile (ms)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_endpoint_consume_percentile"
],
"metricTypes": [
"readLabeledMetricsValues"
],
"metricConfig": [
{
"label": "P50, P75, P90, P95, P99",
"labelsIndex": "0,1,2,3,4"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 18,
"y": 0,
"w": 6,
"h": 13,
"i": "4",
"type": "Widget",
"widget": {
"title": "Transmission AVG Latency (ms)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_endpoint_consume_latency"
],
"metricTypes": [
"readMetricsValues"
],
"metricConfig": [
{
"label": "P50, P75, P90, P95, P99",
"labelsIndex": "0,1,2,3,4"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 0,
"y": 13,
"w": 6,
"h": 13,
"i": "5",
"type": "Widget",
"widget": {
"title": "Produce Traffic (calls / min)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_endpoint_produce_cpm"
],
"metricTypes": [
"readMetricsValues"
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 6,
"y": 13,
"w": 6,
"h": 13,
"i": "6",
"type": "Widget",
"widget": {
"title": "Produce Access Successful Rate (%)",
"name": "Successful_Rate"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_endpoint_produce_sla"
],
"metricTypes": [
"readMetricsValues"
],
"metricConfig": [
{
"calculation": "percentage"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
}
],
"layer": "VIRTUAL_MQ",
"entity": "Endpoint",
"name": "Virtual-MQ-Endpoint",
"id": "Virtual-MQ-Endpoint",
"isRoot": false
}
}
]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
[
{
"id": "Virtual-MQ-Root",
"configuration": {
"children": [
{
"x": 0,
"y": 2,
"w": 24,
"h": 52,
"i": "0",
"type": "Widget",
"widget": {
"title": "Virtual MQ"
},
"graph": {
"type": "ServiceList",
"dashboardName": "Virtual-MQ-Service",
"fontSize": 12,
"showXAxis": false,
"showYAxis": false,
"showGroup": false
},
"metrics": [
"mq_service_consume_latency",
"mq_service_consume_sla",
"mq_service_consume_cpm",
"mq_service_produce_sla",
"mq_service_produce_cpm"
],
"metricTypes": [
"readMetricsValues",
"readMetricsValues",
"readMetricsValues",
"readMetricsValues",
"readMetricsValues"
],
"moved": false,
"metricConfig": [
{
"unit": "ms",
"label": "Transmission Latency",
"calculation": "average"
},
{
"label": "Consume Successful Rate",
"unit": "%",
"calculation": "percentageAvg"
},
{
"label": "Consume Traffic",
"unit": "calls / min",
"calculation": "average"
},
{
"label": "Produce Successful Rate",
"unit": "%",
"calculation": "percentageAvg"
},
{
"label": "Produce Traffic",
"unit": "calls / min",
"calculation": "average"
}
]
}
],
"id": "Virtual-MQ-Root",
"layer": "VIRTUAL_MQ",
"entity": "All",
"name": "Virtual-MQ-Root",
"isRoot": true
}
}
]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
[
{
"id": "Virtual-MQ-Service",
"configuration": {
"children": [
{
"x": 0,
"y": 0,
"w": 6,
"h": 13,
"i": "1",
"type": "Widget",
"widget": {
"title": "Consume Traffic (calls / min)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_service_consume_cpm"
],
"metricTypes": [
"readMetricsValues"
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 6,
"y": 0,
"w": 6,
"h": 13,
"i": "2",
"type": "Widget",
"widget": {
"title": "Consume Access Successful Rate (%)",
"name": "Successful_Rate"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_service_consume_sla"
],
"metricTypes": [
"readMetricsValues"
],
"metricConfig": [
{
"calculation": "percentage"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 12,
"y": 0,
"w": 6,
"h": 13,
"i": "3",
"type": "Widget",
"widget": {
"title": "Transmission Latency Percentile (ms)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_service_consume_percentile"
],
"metricTypes": [
"readLabeledMetricsValues"
],
"metricConfig": [
{
"label": "P50, P75, P90, P95, P99",
"labelsIndex": "0,1,2,3,4"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 18,
"y": 0,
"w": 6,
"h": 13,
"i": "4",
"type": "Widget",
"widget": {
"title": "Transmission AVG Latency (ms)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_service_consume_latency"
],
"metricTypes": [
"readMetricsValues"
],
"metricConfig": [
{
"label": "P50, P75, P90, P95, P99",
"labelsIndex": "0,1,2,3,4"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 0,
"y": 13,
"w": 6,
"h": 13,
"i": "5",
"type": "Widget",
"widget": {
"title": "Produce Traffic (calls / min)"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_service_produce_cpm"
],
"metricTypes": [
"readMetricsValues"
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 6,
"y": 13,
"w": 6,
"h": 13,
"i": "6",
"type": "Widget",
"widget": {
"title": "Produce Access Successful Rate (%)",
"name": "Successful_Rate"
},
"graph": {
"type": "Line",
"step": false,
"smooth": false,
"showSymbol": false,
"showXAxis": true,
"showYAxis": true
},
"metrics": [
"mq_service_produce_sla"
],
"metricTypes": [
"readMetricsValues"
],
"metricConfig": [
{
"calculation": "percentage"
}
],
"associate": [
{
"widgetId": "1"
},
{
"widgetId": "2"
},
{
"widgetId": "3"
},
{
"widgetId": "4"
},
{
"widgetId": "5"
},
{
"widgetId": "6"
}
]
},
{
"x": 0,
"y": 26,
"w": 24,
"h": 26,
"i": "7",
"type": "Widget",
"graph": {
"type": "EndpointList",
"dashboardName": "Virtual-MQ-Endpoint",
"fontSize": 12,
"showXAxis": false,
"showYAxis": false,
"showGroup": true
},
"metrics": [
"mq_endpoint_consume_latency",
"mq_endpoint_consume_sla",
"mq_endpoint_consume_cpm",
"mq_endpoint_produce_sla",
"mq_endpoint_produce_cpm"
],
"metricTypes": [
"readMetricsValues",
"readMetricsValues",
"readMetricsValues",
"readMetricsValues",
"readMetricsValues"
],
"moved": false,
"metricConfig": [
{
"unit": "ms",
"label": "Transmission Latency",
"calculation": "average"
},
{
"label": "Consume Successful Rate",
"unit": "%",
"calculation": "percentageAvg"
},
{
"label": "Consume Traffic",
"unit": "calls / min",
"calculation": "average"
},
{
"label": "Produce Successful Rate",
"unit": "%",
"calculation": "percentageAvg"
},
{
"label": "Produce Traffic",
"unit": "calls / min",
"calculation": "average"
}
]
}
],
"layer": "VIRTUAL_MQ",
"entity": "Service",
"name": "Virtual-MQ-Service",
"id": "Virtual-MQ-Service",
"isRoot": false
}
}
]
Subproject commit 09051e916bb20233e0620e185a13e9aa1ef82dd1
Subproject commit 4232161d36d83716a9949dec1d5da63ec701f928
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
oap:
environment:
SW_METER_ANALYZER_ACTIVE_FILES:
extends:
file: ../../script/docker-compose/base-compose.yml
service: oap
ports:
- 12800
networks:
e2e:
provider:
extends:
file: ../../script/docker-compose/base-compose.yml
service: provider
environment:
SW_METER_REPORT_INTERVAL: 5
kafka_enable: "true"
kafka_topic: topic
kafka_server: kafka:9092
depends_on:
- oap
- kafka
ports:
- "9090:9090"
networks:
e2e:
consumer:
extends:
file: ../../script/docker-compose/base-compose.yml
service: consumer
environment:
SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap:11800
SW_AGENT_NAME: e2e-service-consumer
kafka_enable: "true"
kafka_topic: topic
kafka_server: kafka:9092
depends_on:
- oap
- kafka
ports:
- "9092:9092"
networks:
e2e:
zookeeper:
image: 'bitnami/zookeeper:latest'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
e2e:
kafka:
image: 'bitnami/kafka:latest'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
networks:
e2e:
networks:
e2e:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is used to show how to write configuration files and can be used to test.
setup:
env: compose
file: docker-compose.yml
timeout: 20m
init-system-environment: ../../script/env
steps:
- name: set PATH
command: export PATH=/tmp/skywalking-infra-e2e/bin:$PATH
- name: install yq
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh yq
- name: install swctl
command: bash test/e2e-v2/script/prepare/setup-e2e-shell/install.sh swctl
trigger:
action: http
interval: 3s
times: 10
url: http://${provider_host}:${provider_9090}/kafka/send
method: GET
verify:
retry:
count: 20
interval: 3s
cases:
- includes:
- mq-cases.yaml
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{- contains . }}
- key: {{ notEmpty .key }}
value: {{ ge .value 0 }}
{{- end }}
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
{{- contains . }}
- id: {{ b64enc "kafka:9092" }}.0_{{ b64enc "topic" }}
name: topic
{{- end}}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{- contains . }}
- id: {{ b64enc "kafka:9092" }}.0
name: kafka:9092
group: ""
shortname: kafka:9092
normal: false
layers:
- VIRTUAL_MQ
{{- end }}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cases:
# service list
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
expected: expected/service.yml
# service endpoint service-name=kafka:9092
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql endpoint list --service-id=a2Fma2E6OTA5Mg==.0
expected: expected/service-endpoint.yml
# endpoint metrics
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=mq_service_consume_latency --service-id=a2Fma2E6OTA5Mg==.0 |yq e 'to_entries' -
expected: expected/metrics-has-value0.yml
# endpoint metrics
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=mq_endpoint_consume_cpm --endpoint-name='topic' --service-id=a2Fma2E6OTA5Mg==.0 |yq e 'to_entries' -
expected: expected/metrics-has-value0.yml
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.e2e;
import java.io.IOException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@ConditionalOnExpression("#{'true'.equals(environment['kafka_enable'])}")
public class KafkaConsumer {
@PostConstruct
public void startConsumer() throws IOException, TimeoutException {
String topic = Optional.ofNullable(System.getenv("kafka_topic")).orElse("topic");
String server = Optional.ofNullable(System.getenv("kafka_server")).orElse("kafka:9092");
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", server);
config.put("group.id", "a");
config.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
final org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(config);
consumer.subscribe(Collections.singletonList(topic));
new Thread(() -> {
while (true) {
try {
final ConsumerRecords<String, String> poll = consumer.poll(Duration.ofHours(2));
poll.forEach(e -> log.info("receive msg : {}", e));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.e2e.mq;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@ConditionalOnExpression("#{'true'.equals(environment['kafka_enable'])}")
public class KafkaController {
private KafkaProducer<Object, Object> objectObjectKafkaProducer;
@GetMapping(value = "kafka/send")
public String sendMsg() throws ExecutionException, InterruptedException {
String topic = Optional.ofNullable(System.getenv("kafka_topic")).orElse("topic");
objectObjectKafkaProducer.send(
new ProducerRecord<>(topic, 0, System.currentTimeMillis(), "a".getBytes(), "test".getBytes())
).get();
return "ok";
}
@PostConstruct
public void init() throws UnknownHostException {
String server = Optional.ofNullable(System.getenv("kafka_server")).orElse("kafka:9092");
Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("bootstrap.servers", server);
config.put("acks", "all");
config.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
objectObjectKafkaProducer = new KafkaProducer<>(config);
}
}
......@@ -111,6 +111,13 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
<build>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册