未验证 提交 8436135d 编写于 作者: L liqiangz 提交者: GitHub

Add MeterReportService collectBatch method. (#8165)

上级 9f585527
......@@ -55,6 +55,7 @@ Release Notes.
* Fix concurrency bug in MAL `increase`-related calculation.
* Fix a null pointer bug when building `SampleFamily`.
* Fix the so11y latency of persistence execution latency not correct in ElasticSearch storage.
* Add `MeterReportService` `collectBatch` method.
#### UI
......
Subproject commit 21492e496b797567d0e127f4510509baf73e10fd
Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a
......@@ -22,6 +22,7 @@ import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.IMeterProcessService;
import org.apache.skywalking.oap.server.analyzer.provider.meter.process.MeterProcessor;
......@@ -87,4 +88,33 @@ public class MeterServiceHandler extends MeterReportServiceGrpc.MeterReportServi
}
};
}
@Override
public StreamObserver<MeterDataCollection> collectBatch(StreamObserver<Commands> responseObserver) {
return new StreamObserver<MeterDataCollection>() {
@Override
public void onNext(MeterDataCollection meterDataCollection) {
final MeterProcessor processor = processService.createProcessor();
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
meterDataCollection.getMeterDataList().forEach(processor::read);
processor.process();
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
}
}
@Override
public void onError(Throwable throwable) {
log.error(throwable.getMessage(), throwable);
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
responseObserver.onNext(Commands.newBuilder().build());
responseObserver.onCompleted();
}
};
}
}
......@@ -42,4 +42,4 @@ verify:
interval: 3s
cases:
- includes:
- ../../meter/meter-cases.yaml
- meter-cases.yaml
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{- contains . }}
- key: {{ notEmpty .key }}
value: {{ ge .value 1 }}
{{- end }}
# Licensed to Apache Software Foundation (ASF) under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Apache Software Foundation (ASF) licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
{{- contains . }}
- id: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }}
name: provider1
attributes:
{{- contains .attributes }}
- name: OS Name
value: Linux
- name: hostname
value: {{ notEmpty .value }}
- name: Process No.
value: "1"
- name: Start Time
value: {{ notEmpty .value }}
- name: JVM Arguments
value: '{{ notEmpty .value }}'
- name: Jar Dependencies
value: '{{ notEmpty .value }}'
- name: ipv4s
value: {{ notEmpty .value }}
{{- end}}
language: JAVA
instanceuuid: {{ b64enc "e2e-service-provider" }}.1_{{ b64enc "provider1" }}
{{- end}}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{- contains . }}
- id: {{ b64enc "e2e-service-provider" }}.1
name: e2e-service-provider
group: ""
{{- end }}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cases:
# service list
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql service ls
expected: expected/service.yml
# service instance list
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
expected: expected/service-instance.yml
# service instance metrics linear
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_jvm_memory_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: expected/metrics-has-value.yml
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_jvm_threads_live --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: expected/metrics-has-value.yml
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: expected/metrics-has-value.yml
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
expSuffix: instance(['service'], ['instance'])
metricPrefix: batch
metricsRules:
- name: test
exp: batch_test
......@@ -18,10 +18,12 @@ version: '2.1'
services:
oap:
environment:
SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth
SW_METER_ANALYZER_ACTIVE_FILES: spring-sleuth,batch-meter
extends:
file: ../../script/docker-compose/base-compose.yml
service: oap
volumes:
- ./batch-meter.yaml:/skywalking/config/meter-analyzer-config/batch-meter.yaml
ports:
- 12800
......@@ -37,5 +39,26 @@ services:
ports:
- 9090
sender:
image: "adoptopenjdk/openjdk8:alpine-jre"
volumes:
- ./../../java-test-service/e2e-mock-sender/target/e2e-mock-sender-2.0.0.jar:/e2e-mock-sender-2.0.0.jar
command: [ "java", "-jar", "/e2e-mock-sender-2.0.0.jar" ]
environment:
OAP_HOST: oap
OAP_GRPC_PORT: 11800
networks:
- e2e
ports:
- 9093
healthcheck:
test: ["CMD", "sh", "-c", "nc -nz 127.0.0.1 9093"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
oap:
condition: service_healthy
networks:
e2e:
......@@ -27,3 +27,8 @@
expected: expected/metrics-has-value.yml
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=meter_process_files_max --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: expected/metrics-has-value.yml
- query: |
curl -s -XPOST http://${sender_host}:${sender_9093}/sendBatchMetrics > /dev/null;
sleep 10;
swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name=batch_test --instance-name=test-instance --service-name=test-service |yq e 'to_entries' -
expected: expected/metrics-has-value.yml
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.e2e.controller;
import io.grpc.ManagedChannel;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CountDownLatch;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterDataCollection;
import org.apache.skywalking.apm.network.language.agent.v3.MeterReportServiceGrpc;
import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue;
import org.apache.skywalking.e2e.E2EConfiguration;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MeterMetricSenderController {
private static final int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50;
private final MeterReportServiceGrpc.MeterReportServiceStub grpcStub;
public MeterMetricSenderController(final E2EConfiguration configuration) {
final ManagedChannel channel = NettyChannelBuilder.forAddress(
configuration.getOapHost(), Integer.parseInt(configuration.getOapGrpcPort()))
.nameResolverFactory(new DnsNameResolverProvider())
.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.usePlaintext()
.build();
grpcStub = MeterReportServiceGrpc.newStub(channel);
}
@PostMapping("/sendBatchMetrics")
public String sendBatchMetrics() throws Exception {
final MeterDataCollection.Builder builder =
MeterDataCollection.newBuilder()
.addMeterData(MeterData.newBuilder()
.setService("test-service")
.setTimestamp(System.currentTimeMillis())
.setServiceInstance("test-instance")
.setSingleValue(MeterSingleValue.newBuilder()
.setName("batch_test")
.setValue(100)
.build())
.build());
sendMetrics(builder.build());
return "Metrics send success!";
}
void sendMetrics(final MeterDataCollection metrics) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
StreamObserver<MeterDataCollection> collect = grpcStub.collectBatch(new StreamObserver<Commands>() {
@Override
public void onNext(final Commands commands) {
}
@Override
public void onError(final Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
collect.onNext(metrics);
collect.onCompleted();
latch.await();
}
}
Subproject commit 21492e496b797567d0e127f4510509baf73e10fd
Subproject commit fbbe955545fd2c942ca59cd05720a084d010bb8a
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册