From 8436135dc919351a6d0ab42ff48c85e1b3104b53 Mon Sep 17 00:00:00 2001 From: liqiangz Date: Mon, 22 Nov 2021 22:11:39 +0800 Subject: [PATCH] Add MeterReportService collectBatch method. (#8165) --- CHANGES.md | 1 + apm-protocol/apm-network/src/main/proto | 2 +- .../provider/handler/MeterServiceHandler.java | 30 ++++++ test/e2e-v2/cases/kafka/meter/e2e.yaml | 2 +- .../meter/expected/metrics-has-value.yml | 19 ++++ .../kafka/meter/expected/service-instance.yml | 40 ++++++++ .../cases/kafka/meter/expected/service.yml | 20 ++++ .../e2e-v2/cases/kafka/meter/meter-cases.yaml | 29 ++++++ test/e2e-v2/cases/meter/batch-meter.yaml | 20 ++++ test/e2e-v2/cases/meter/docker-compose.yml | 25 ++++- test/e2e-v2/cases/meter/meter-cases.yaml | 5 + .../MeterMetricSenderController.java | 97 +++++++++++++++++++ .../e2e-protocol/src/main/proto | 2 +- 13 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml create mode 100644 test/e2e-v2/cases/kafka/meter/expected/service-instance.yml create mode 100644 test/e2e-v2/cases/kafka/meter/expected/service.yml create mode 100644 test/e2e-v2/cases/kafka/meter/meter-cases.yaml create mode 100644 test/e2e-v2/cases/meter/batch-meter.yaml create mode 100644 test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java diff --git a/CHANGES.md b/CHANGES.md index 6283e24edb..0883217011 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -55,6 +55,7 @@ Release Notes. * Fix concurrency bug in MAL `increase`-related calculation. * Fix a null pointer bug when building `SampleFamily`. * Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage. +* Add `MeterReportService` `collectBatch` method. #### UI diff --git a/apm-protocol/apm-network/src/main/proto b/apm-protocol/apm-network/src/main/proto index 21492e496b..fbbe955545 160000 --- a/apm-protocol/apm-network/src/main/proto +++ b/apm-protocol/apm-network/src/main/proto @@ -1 +1 @@ -Subproject commit 21492e496b797567d0e127f4510509baf73e10fd +Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a diff --git a/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java index b2f0d2e4c9..73cdd445c7 100644 --- a/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java @@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.network.common.v3.Commands; import org.apache.skywalking.apm.network.language.agent.v3.MeterData; +import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection; import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc; import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService; import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor; @@ -87,4 +88,33 @@ public class MeterServiceHandler extends MeterReportServiceGrpc.MeterReportServi } }; } + + @Override + public StreamObserver collectBatch(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(MeterDataCollection meterDataCollection) { + final MeterProcessor processor = processService.createProcessor(); + try (HistogramMetrics.Timer ignored = histogram.createTimer()) { + meterDataCollection.getMeterDataList().forEach(processor::read); + processor.process(); + } catch (Exception e) { + errorCounter.inc(); + log.error(e.getMessage(), e); + } + } + + @Override + public void onError(Throwable throwable) { + log.error(throwable.getMessage(), throwable); + responseObserver.onCompleted(); + } + + @Override + public void onCompleted() { + responseObserver.onNext(Commands.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + } } diff --git a/test/e2e-v2/cases/kafka/meter/e2e.yaml b/test/e2e-v2/cases/kafka/meter/e2e.yaml index 4fff53555d..398897dccd 100644 --- a/test/e2e-v2/cases/kafka/meter/e2e.yaml +++ b/test/e2e-v2/cases/kafka/meter/e2e.yaml @@ -42,4 +42,4 @@ verify: interval: 3s cases: - includes: - - ../../meter/meter-cases.yaml + - meter-cases.yaml diff --git a/test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml b/test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml new file mode 100644 index 0000000000..5359e6d315 --- /dev/null +++ b/test/e2e-v2/cases/kafka/meter/expected/metrics-has-value.yml @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{{- contains . }} +- key: {{ notEmpty .key }} + value: {{ ge .value 1 }} +{{- end }} diff --git a/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml b/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml new file mode 100644 index 0000000000..e5252050a3 --- /dev/null +++ b/test/e2e-v2/cases/kafka/meter/expected/service-instance.yml @@ -0,0 +1,40 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +{{- contains . }} +- id: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }} + name: provider1 + attributes: + {{- contains .attributes }} + - name: OS Name + value: Linux + - name: hostname + value: {{ notEmpty .value }} + - name: Process No. + value: "1" + - name: Start Time + value: {{ notEmpty .value }} + - name: JVM Arguments + value: '{{ notEmpty .value }}' + - name: Jar Dependencies + value: '{{ notEmpty .value }}' + - name: ipv4s + value: {{ notEmpty .value }} + {{- end}} + language: JAVA + instanceuuid: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }} +{{- end}} diff --git a/test/e2e-v2/cases/kafka/meter/expected/service.yml b/test/e2e-v2/cases/kafka/meter/expected/service.yml new file mode 100644 index 0000000000..3a333487d9 --- /dev/null +++ b/test/e2e-v2/cases/kafka/meter/expected/service.yml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{{- contains . }} +- id: {{ b64enc "e2e-service-provider" }}.1 + name: e2e-service-provider + group: "" +{{- end }} diff --git a/test/e2e-v2/cases/kafka/meter/meter-cases.yaml b/test/e2e-v2/cases/kafka/meter/meter-cases.yaml new file mode 100644 index 0000000000..ce40b82c1b --- /dev/null +++ b/test/e2e-v2/cases/kafka/meter/meter-cases.yaml @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + cases: + # service list + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls + expected: expected/service.yml + # service instance list + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider + expected: expected/service-instance.yml + # service instance metrics linear + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_jvm_memory_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_jvm_threads_live --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - + expected: expected/metrics-has-value.yml + - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - + expected: expected/metrics-has-value.yml \ No newline at end of file diff --git a/test/e2e-v2/cases/meter/batch-meter.yaml b/test/e2e-v2/cases/meter/batch-meter.yaml new file mode 100644 index 0000000000..625d23a2aa --- /dev/null +++ b/test/e2e-v2/cases/meter/batch-meter.yaml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +expSuffix: instance(['service'], ['instance']) +metricPrefix: batch +metricsRules: + - name: test + exp: batch_test diff --git a/test/e2e-v2/cases/meter/docker-compose.yml b/test/e2e-v2/cases/meter/docker-compose.yml index 7ab0404ab2..7590a7d3c0 100644 --- a/test/e2e-v2/cases/meter/docker-compose.yml +++ b/test/e2e-v2/cases/meter/docker-compose.yml @@ -18,10 +18,12 @@ version: '2.1' services: oap: environment: - SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth + SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth,batch-meter extends: file: ../../script/docker-compose/base-compose.yml service: oap + volumes: + - ./batch-meter.yaml:/skywalking/config/meter-analyzer-config/batch-meter.yaml ports: - 12800 @@ -37,5 +39,26 @@ services: ports: - 9090 + sender: + image: "adoptopenjdk/openjdk8:alpine-jre" + volumes: + - ./../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar + command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ] + environment: + OAP_HOST: oap + OAP_GRPC_PORT: 11800 + networks: + - e2e + ports: + - 9093 + healthcheck: + test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + oap: + condition: service_healthy + networks: e2e: diff --git a/test/e2e-v2/cases/meter/meter-cases.yaml b/test/e2e-v2/cases/meter/meter-cases.yaml index d1eeb7c0ad..e3bae75aac 100644 --- a/test/e2e-v2/cases/meter/meter-cases.yaml +++ b/test/e2e-v2/cases/meter/meter-cases.yaml @@ -27,3 +27,8 @@ expected: expected/metrics-has-value.yml - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' - expected: expected/metrics-has-value.yml + - query: | + curl -s -XPOST http://${sender_host}:${sender_9093}/sendBatchMetrics > /dev/null; + sleep 10; + swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=batch_test --instance-name=test-instance --service-name=test-service |yq e 'to_entries' - + expected: expected/metrics-has-value.yml \ No newline at end of file diff --git a/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java new file mode 100644 index 0000000000..876ed23dd7 --- /dev/null +++ b/test/e2e-v2/java-test-service/e2e-mock-sender/src/main/java/org/apache/skywalking/e2e/controller/MeterMetricSenderController.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.e2e.controller; + +import io.grpc.ManagedChannel; +import io.grpc.internal.DnsNameResolverProvider; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CountDownLatch; +import org.apache.skywalking.apm.network.common.v3.Commands; +import org.apache.skywalking.apm.network.language.agent.v3.MeterData; +import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection; +import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc; +import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue; +import org.apache.skywalking.e2e.E2EConfiguration; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class MeterMetricSenderController { + private static final int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50; + private final MeterReportServiceGrpc.MeterReportServiceStub grpcStub; + + public MeterMetricSenderController(final E2EConfiguration configuration) { + final ManagedChannel channel = NettyChannelBuilder.forAddress( + configuration.getOapHost(), Integer.parseInt(configuration.getOapGrpcPort())) + .nameResolverFactory(new DnsNameResolverProvider()) + .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) + .usePlaintext() + .build(); + + grpcStub = MeterReportServiceGrpc.newStub(channel); + } + + @PostMapping("/sendBatchMetrics") + public String sendBatchMetrics() throws Exception { + final MeterDataCollection.Builder builder = + MeterDataCollection.newBuilder() + .addMeterData(MeterData.newBuilder() + .setService("test-service") + .setTimestamp(System.currentTimeMillis()) + .setServiceInstance("test-instance") + .setSingleValue(MeterSingleValue.newBuilder() + .setName("batch_test") + .setValue(100) + .build()) + .build()); + + sendMetrics(builder.build()); + + return "Metrics send success!"; + } + + void sendMetrics(final MeterDataCollection metrics) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + StreamObserver collect = grpcStub.collectBatch(new StreamObserver() { + @Override + public void onNext(final Commands commands) { + + } + + @Override + public void onError(final Throwable throwable) { + throwable.printStackTrace(); + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }); + + collect.onNext(metrics); + + collect.onCompleted(); + + latch.await(); + } +} diff --git a/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto b/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto index 21492e496b..fbbe955545 160000 --- a/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto +++ b/test/e2e-v2/java-test-service/e2e-protocol/src/main/proto @@ -1 +1 @@ -Subproject commit 21492e496b797567d0e127f4510509baf73e10fd +Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a -- GitLab