From bd38ec492e013fec7ffc1359c9c8a7e9e1d52389 Mon Sep 17 00:00:00 2001 From: Zhenxu Date: Sun, 30 May 2021 22:35:55 +0800 Subject: [PATCH] Add HTTP implementation of logs reporting protocol (#7038) --- CHANGES.md | 1 + docs/en/protocols/Log-Data-Protocol.md | 40 +++++++- .../log/provider/LogModuleProvider.java | 9 +- .../{ => grpc}/LogReportServiceHandler.java | 2 +- .../rest/LogReportServiceRestHandler.java | 95 +++++++++++++++++++ 5 files changed, 144 insertions(+), 3 deletions(-) rename oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/{ => grpc}/LogReportServiceHandler.java (99%) create mode 100644 oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/rest/LogReportServiceRestHandler.java diff --git a/CHANGES.md b/CHANGES.md index 6aa69e0766..8074eda0b7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -56,6 +56,7 @@ Release Notes. * Fix counter misuse in the alarm core. Alarm can't be triggered in time. * Events can be configured as alarm source. * Make the number of core worker in meter converter thread pool configurable. +* Add HTTP implementation of logs reporting protocol. #### UI * Add logo for kong plugin. diff --git a/docs/en/protocols/Log-Data-Protocol.md b/docs/en/protocols/Log-Data-Protocol.md index 565bc97be2..19c08fccee 100644 --- a/docs/en/protocols/Log-Data-Protocol.md +++ b/docs/en/protocols/Log-Data-Protocol.md @@ -8,7 +8,7 @@ Report `native-proto` format log via gRPC. [gRPC service define](https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto) -## Native Json Protocol +## Native Kafka Protocol Report `native-json` format log via kafka. @@ -43,3 +43,41 @@ Json log record example: } ``` +## HTTP API + +Report `json` format logs via HTTP API, the endpoint is http://:12800/logs. + +Json log record example: + +```json +[ + { + "timestamp": 1618161813371, + "service": "Your_ApplicationName", + "serviceInstance": "3a5b8da5a5ba40c0b192e91b5c80f1a8@192.168.1.8", + "traceContext": { + "traceId": "ddd92f52207c468e9cd03ddd107cd530.69.16181331190470001", + "spanId": "0", + "traceSegmentId": "ddd92f52207c468e9cd03ddd107cd530.69.16181331190470000" + }, + "tags": { + "data": [ + { + "key": "level", + "value": "INFO" + }, + { + "key": "logger", + "value": "com.example.MyLogger" + } + ] + }, + "body": { + "text": { + "text": "log message" + } + } + } +] +``` + diff --git a/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/LogModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/LogModuleProvider.java index 62a862bf24..bbc1de78e0 100644 --- a/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/LogModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/LogModuleProvider.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.recevier.log.provider; import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; @@ -27,7 +28,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule; import org.apache.skywalking.oap.server.recevier.log.module.LogModule; -import org.apache.skywalking.oap.server.recevier.log.provider.handler.LogReportServiceHandler; +import org.apache.skywalking.oap.server.recevier.log.provider.handler.grpc.LogReportServiceHandler; +import org.apache.skywalking.oap.server.recevier.log.provider.handler.rest.LogReportServiceRestHandler; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; public class LogModuleProvider extends ModuleProvider { @@ -59,6 +61,11 @@ public class LogModuleProvider extends ModuleProvider { .getService(GRPCHandlerRegister.class); grpcHandlerRegister.addHandler(new LogReportServiceHandler(getManager())); + + JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME) + .provider() + .getService(JettyHandlerRegister.class); + jettyHandlerRegister.addHandler(new LogReportServiceRestHandler(getManager())); } @Override diff --git a/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/LogReportServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/grpc/LogReportServiceHandler.java similarity index 99% rename from oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/LogReportServiceHandler.java rename to oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/grpc/LogReportServiceHandler.java index 48ced43469..e47b363f10 100644 --- a/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/LogReportServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/grpc/LogReportServiceHandler.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.skywalking.oap.server.recevier.log.provider.handler; +package org.apache.skywalking.oap.server.recevier.log.provider.handler.grpc; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; diff --git a/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/rest/LogReportServiceRestHandler.java b/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/rest/LogReportServiceRestHandler.java new file mode 100644 index 0000000000..d0d142ca2f --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-log-recevier-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/log/provider/handler/rest/LogReportServiceRestHandler.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.recevier.log.provider.handler.rest; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import java.io.BufferedReader; +import java.util.ArrayList; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.logging.v3.LogData; +import org.apache.skywalking.oap.log.analyzer.module.LogAnalyzerModule; +import org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; +import org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; + +@Slf4j +public class LogReportServiceRestHandler extends JettyHandler { + private final Gson gson = new Gson(); + + private final HistogramMetrics histogram; + + private final CounterMetrics errorCounter; + + private final ILogAnalyzerService logAnalyzerService; + + public LogReportServiceRestHandler(final ModuleManager moduleManager) { + final MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + + logAnalyzerService = moduleManager.find(LogAnalyzerModule.NAME) + .provider() + .getService(ILogAnalyzerService.class); + + histogram = metricsCreator.createHistogramMetric( + "log_in_latency", "The process latency of log", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("http") + ); + errorCounter = metricsCreator.createCounter( + "log_analysis_error_count", "The error number of log analysis", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("http") + ); + } + + @Override + protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) { + try (final HistogramMetrics.Timer ignored = histogram.createTimer()) { + final BufferedReader reader = req.getReader(); + final StringBuilder content = new StringBuilder(); + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + content.append(line); + } + final JsonArray array = gson.fromJson(content.toString(), JsonArray.class); + final ArrayList logs = new ArrayList<>(array.size()); + for (final JsonElement it : array) { + final LogData.Builder builder = LogData.newBuilder(); + ProtoBufJsonUtils.fromJSON(it.toString(), builder); + logs.add(builder); + } + logs.forEach(logAnalyzerService::doAnalysis); + } catch (final Exception e) { + log.error(e.getMessage(), e); + errorCounter.inc(); + } + } + + @Override + public String pathSpec() { + return "/logs"; + } +} -- GitLab