From 5cdd4ed50f2a188efbec753f8a04c9008aa4f769 Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Sun, 28 Nov 2021 22:30:12 +0800 Subject: [PATCH] Introduce the Customized ALS Protocol for Satellite (#8193) --- CHANGES.md | 1 + .../envoy/AccessLogServiceGRPCHandler.java | 9 +++- .../envoy/EnvoyMetricReceiverProvider.java | 1 + ...atelliteAccessLogServiceGRPCHandlerV3.java | 35 +++++++++++++++ .../satellite/envoy/accesslog/v3/als.proto | 43 +++++++++++++++++++ 5 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/SatelliteAccessLogServiceGRPCHandlerV3.java create mode 100644 oap-server/server-receiver-plugin/receiver-proto/src/main/proto/satellite/envoy/accesslog/v3/als.proto diff --git a/CHANGES.md b/CHANGES.md index 6a6128911e..c06c95c806 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -60,6 +60,7 @@ Release Notes. * Upgrade grpc-java to 1.42.1 and protoc to 3.19.1 to allow using native Mac osx-aarch_64 artifacts. * Fix TopologyQuery.loadEndpointRelation bug. * Support using IoTDB as a new storage option. +* Add customized envoy ALS protocol receiver for satellite transmit batch data. #### UI diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java index 1915a44f45..520f1c9e6a 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java @@ -95,7 +95,12 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS @Override public StreamObserver streamAccessLogs( - StreamObserver responseObserver) { + StreamObserver responseObserver) { + return streamAccessLogs(responseObserver, false); + } + + public StreamObserver streamAccessLogs( + StreamObserver responseObserver, boolean alwaysAnalyzeIdentity) { return new StreamObserver() { private volatile boolean isFirst = true; private Role role; @@ -105,7 +110,7 @@ public class AccessLogServiceGRPCHandler extends AccessLogServiceGrpc.AccessLogS public void onNext(StreamAccessLogsMessage message) { HistogramMetrics.Timer timer = histogram.createTimer(); try { - if (isFirst) { + if (isFirst || (alwaysAnalyzeIdentity && message.hasIdentifier())) { identifier = message.getIdentifier(); isFirst = false; role = Role.NONE; diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java index 2678f87dfd..884b7e1aea 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java @@ -84,6 +84,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider { final AccessLogServiceGRPCHandler handler = new AccessLogServiceGRPCHandler(getManager(), config); service.addHandler(handler); service.addHandler(new AccessLogServiceGRPCHandlerV3(handler)); + service.addHandler(new SatelliteAccessLogServiceGRPCHandlerV3(handler)); } @Override diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/SatelliteAccessLogServiceGRPCHandlerV3.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/SatelliteAccessLogServiceGRPCHandlerV3.java new file mode 100644 index 0000000000..d811321c94 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/SatelliteAccessLogServiceGRPCHandlerV3.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy; + +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage; +import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsResponse; +import io.grpc.stub.StreamObserver; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.satellite.envoy.accesslog.v3.SatelliteAccessLogServiceGrpc; + +@RequiredArgsConstructor +public class SatelliteAccessLogServiceGRPCHandlerV3 extends SatelliteAccessLogServiceGrpc.SatelliteAccessLogServiceImplBase { + private final AccessLogServiceGRPCHandler delegate; + + @Override + public StreamObserver streamAccessLogs(StreamObserver responseObserver) { + return delegate.streamAccessLogs(responseObserver, true); + } +} diff --git a/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/satellite/envoy/accesslog/v3/als.proto b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/satellite/envoy/accesslog/v3/als.proto new file mode 100644 index 0000000000..0d54814cf1 --- /dev/null +++ b/oap-server/server-receiver-plugin/receiver-proto/src/main/proto/satellite/envoy/accesslog/v3/als.proto @@ -0,0 +1,43 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +syntax = "proto3"; + +package satellite.envoy.accesslog.v3; + +import "envoy/service/accesslog/v3/als.proto"; + +option java_package = "org.apache.skywalking.satellite.envoy.accesslog.v3"; +option java_outer_classname = "SatelliteAlsProto"; +option java_multiple_files = true; +option java_generic_services = true; + +// [#protodoc-title: Satellite gRPC Access Log Service (ALS)] + +// The new Envoy ALS protocol, work for satellite transmit the ALS message to oap. +service SatelliteAccessLogService { + // Use the same parameters to transmit access log messages. + // The only difference is that the identity information (StreamAccessLogsMessage#identity) may occur on each message. + // Rely on the streaming messages are orderly, so there will be no problems with message processing. + // Therefore, when the satellite transmits the ALS message, it does not need to open, send and close the stream for each different identity (envoy). + // As a result, unnecessary streaming operation requests could be reduced, and the satellite becomes more stable when the satellite sends requests to the upstream. + // Especially when the number of envoys increases, the optimization becomes more obvious. + rpc StreamAccessLogs(stream .envoy.service.accesslog.v3.StreamAccessLogsMessage) returns (.envoy.service.accesslog.v3.StreamAccessLogsResponse) { + } +} -- GitLab