未验证 提交 93fcf8d0 编写于 作者: K kezhenxu94 提交者: GitHub

Add APIs to query ondemand Pod log (#9130)

上级 1934f308
......@@ -61,7 +61,7 @@
* Storage(ElasticSearch): add search options to tolerate inexisting indices.
* Fix the problem that `MQ` has the wrong `Layer` type.
* Fix NoneStream model has wrong downsampling(was Second, should be Minute).
* SQL Database: provide `@SQLDatabase.AdditionalEntity` to support create additional tables from a model.
* SQL Database: provide `@SQLDatabase.AdditionalEntity` to support create additional tables from a model.
* [Breaking Change] SQL Database: remove SQL Database config `maxSizeOfArrayColumn` and `numOfSearchableValuesPerTag`.
* [Breaking Change] SQL Database: move `Tags list` from `Segment`,`Logs`,`Alarms` to their additional table.
* [Breaking Change] Remove `total` field in Trace, Log, Event, Browser log, and alarm list query.
......@@ -72,6 +72,7 @@
* Fix searchableTag as `rpc.status_code` and `http.status_code`. `status_code` had been removed.
* Fix scroll query failure exception.
* Add `profileDataQueryBatchSize` config in Elasticsearch Storage.
* Add APIs to query Pod log on demand.
#### UI
......
......@@ -227,6 +227,7 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | enableLogTestTool | Enable the log testing API to test the LAL. **NOTE**: This API evaluates untrusted code on the OAP server. A malicious script can do significant damage (steal keys and secrets, remove files and directories, install malware, etc). As such, please enable this API only when you completely trust your users. | SW_QUERY_GRAPHQL_ENABLE_LOG_TEST_TOOL | false |
| - | - | maxQueryComplexity | Maximum complexity allowed for the GraphQL query that can be used to abort a query if the total number of data fields queried exceeds the defined threshold. | SW_QUERY_MAX_QUERY_COMPLEXITY | 1000 |
| - | - | enableUpdateUITemplate | Allow user add,disable and update UI template. | SW_ENABLE_UPDATE_UI_TEMPLATE | false |
| - | - | enableOnDemandPodLog | Ondemand Pod log: fetch the Pod logs on users' demand, the logs are fetched and displayed in real time, and are not persisted in any kind. This is helpful when users want to do some experiments and monitor the logs and see what's happing inside the service. Note: if you print secrets in the logs, they are also visible to the UI, so for the sake of security, this feature is disabled by default, please set this configuration to enable the feature manually. | SW_ENABLE_ON_DEMAND_POD_LOG | false |
| alarm | default | - | Read [alarm doc](backend-alarm.md) for more details. | - | |
| telemetry | - | - | Read [telemetry doc](backend-telemetry.md) for more details. | - | |
| - | none | - | No op implementation. | - | |
......
# On Demand Pod Logs
This feature is to fetch the Pod logs on users' demand, the logs are fetched and displayed in real time,
and are not persisted in any kind. This is helpful when users want to do some experiments and monitor the
logs and see what's happing inside the service.
Note: if you print secrets in the logs, they are also visible to the UI, so for the sake of security, this
feature is disabled by default, please read the configuration documentation to enable this feature manually.
## How it works
As the name indicates, this feature only works for Kubernetes Pods.
SkyWalking OAP lists the Kubernetes namespaces, services, Pods and containers in the UI for users to select,
users can select the same and UI should fetch the logs in a given interval and display the logs in UI.
That said, in order to make this feature work properly, you should in advance configure the cluster role for
OAP to list/get namespaces, services, pods and pods/log.
......@@ -133,6 +133,8 @@ catalog:
catalog:
- name: "Log Collecting And Analysis"
path: "/en/setup/backend/log-analyzer"
- name: "On Demand Pod Logs"
path: "/en/setup/backend/on-demand-pod-logg"
- name: "Extension"
catalog:
- name: "Metrics Exporter"
......
......@@ -20,11 +20,13 @@ package org.apache.skywalking.oap.server.core.query.input;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.query.DurationUtils;
import org.apache.skywalking.oap.server.core.query.PointOfTime;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
@Getter
@Setter
public class Duration {
private String start;
private String end;
......
......@@ -24,6 +24,7 @@ import lombok.Setter;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.type.Pagination;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.TraceSource;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
@Getter
......@@ -40,4 +41,5 @@ public class TraceQueryCondition {
private QueryOrder queryOrder;
private Pagination paging;
private List<Tag> tags;
private TraceSource prioritizedSource;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.query.type;
public enum TraceSource {
SKYWALKING, ZIPKIN
}
......@@ -63,5 +63,10 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</dependency>
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
</dependency>
</dependencies>
</project>
......@@ -31,4 +31,5 @@ public class GraphQLQueryConfig extends ModuleConfig {
private boolean enableLogTestTool;
private int maxQueryComplexity = 1000;
private boolean enableUpdateUITemplate = false;
private boolean enableOnDemandPodLog = false;
}
......@@ -40,6 +40,7 @@ import org.apache.skywalking.oap.query.graphql.resolver.Mutation;
import org.apache.skywalking.oap.query.graphql.resolver.ProfileMutation;
import org.apache.skywalking.oap.query.graphql.resolver.ProfileQuery;
import org.apache.skywalking.oap.query.graphql.resolver.Query;
import org.apache.skywalking.oap.query.graphql.resolver.OndemandLogQuery;
import org.apache.skywalking.oap.query.graphql.resolver.TopNRecordsQuery;
import org.apache.skywalking.oap.query.graphql.resolver.TopologyQuery;
import org.apache.skywalking.oap.query.graphql.resolver.TraceQuery;
......@@ -119,8 +120,15 @@ public class GraphQLQueryProvider extends ModuleProvider {
.file("query-protocol/metadata-v2.graphqls")
.resolvers(new MetadataQueryV2(getManager()))
.file("query-protocol/ebpf-profiling.graphqls")
.resolvers(new EBPFProcessProfilingQuery(getManager()), new EBPFProcessProfilingMutation(getManager()))
.scalars(ExtendedScalars.GraphQLLong);
.resolvers(new EBPFProcessProfilingQuery(getManager()), new EBPFProcessProfilingMutation(getManager()));
if (config.isEnableOnDemandPodLog()) {
schemaBuilder
.file("query-protocol/ondemand-pod-log.graphqls")
.resolvers(new OndemandLogQuery());
}
schemaBuilder.scalars(ExtendedScalars.GraphQLLong);
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.query.graphql.resolver;
import static java.util.Comparator.comparing;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import org.apache.skywalking.oap.query.graphql.type.InternalLog;
import org.apache.skywalking.oap.query.graphql.type.LogAdapter;
import org.apache.skywalking.oap.query.graphql.type.OndemandLogQueryCondition;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Log;
import org.apache.skywalking.oap.server.core.query.type.Logs;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import graphql.kickstart.tools.GraphQLQueryResolver;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1NamespaceList;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.util.Config;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class OndemandLogQuery implements GraphQLQueryResolver {
private final Gson gson = new Gson();
private final Type responseType = new TypeToken<Map<String, Object>>() {
}.getType();
private CoreV1Api kApi;
public List<String> listNamespaces() throws IOException {
try {
final V1NamespaceList nsList =
kApi().listNamespace(null, null, null, null, null, null, null, null, null, null);
return nsList
.getItems()
.stream()
.map(V1Namespace::getMetadata)
.filter(Objects::nonNull)
.map(V1ObjectMeta::getName)
.collect(Collectors.toList());
} catch (ApiException e) {
log.error("Failed to list namespaces from Kubernetes, {}", e.getResponseBody(), e);
Map<String, Object> responseBody = gson.fromJson(e.getResponseBody(), responseType);
String message = responseBody.getOrDefault("message", e.getCode()).toString();
throw new RuntimeException(message);
}
}
public List<String> listContainers(final OndemandLogQueryCondition condition)
throws IOException {
final String ns = condition.getNamespace();
final IDManager.ServiceInstanceID.InstanceIDDefinition instanceIDDefinition =
IDManager.ServiceInstanceID.analysisId(condition.getServiceInstanceId());
final String instanceName = instanceIDDefinition.getName();
try {
final V1Pod pod = kApi().readNamespacedPod(instanceName, ns, null);
final V1PodSpec spec = pod.getSpec();
if (isNull(spec)) {
throw new RuntimeException(String.format("No spec: %s:%s", ns, instanceName));
}
final List<String> containers = spec.getContainers().stream()
.map(V1Container::getName)
.collect(Collectors.toList());
if (nonNull(spec.getInitContainers())) {
final List<String> init = spec.getInitContainers().stream()
.map(V1Container::getName)
.collect(Collectors.toList());
containers.addAll(init);
}
return containers;
} catch (ApiException e) {
log.error("Failed to list containers from Kubernetes, {}", e.getResponseBody(), e);
Map<String, Object> responseBody = gson.fromJson(e.getResponseBody(), responseType);
String message = responseBody.getOrDefault("message", e.getCode()).toString();
throw new RuntimeException(message);
}
}
public Logs ondemandPodLogs(OndemandLogQueryCondition condition)
throws IOException {
final String ns = condition.getNamespace();
final IDManager.ServiceInstanceID.InstanceIDDefinition instanceIDDefinition =
IDManager.ServiceInstanceID.analysisId(condition.getServiceInstanceId());
final String instanceName = instanceIDDefinition.getName();
try {
final V1Pod pod = kApi().readNamespacedPod(instanceName, ns, null);
final V1ObjectMeta podMetadata = pod.getMetadata();
if (isNull(podMetadata)) {
throw new RuntimeException(
String.format("No such instance: %s:%s", ns, instanceName));
}
final V1PodSpec spec = pod.getSpec();
if (isNull(spec)) {
throw new RuntimeException(String.format("No spec: %s:%s", ns, instanceName));
}
final Duration duration = new Duration();
duration.setStart(condition.getDuration().getStart());
duration.setEnd(condition.getDuration().getEnd());
duration.setStep(condition.getDuration().getStep());
final long since = duration.getStartTimestamp() / 1000;
final String container = condition.getContainer();
final String podLog = kApi().readNamespacedPodLog(
podMetadata.getName(),
podMetadata.getNamespace(),
container,
false, null, null, null, null, (int) since, null, true);
final List<InternalLog> logs = Splitter.on("\n").omitEmptyStrings()
.splitToList(Strings.nullToEmpty(podLog))
.stream()
.filter(StringUtil::isNotBlank)
.map(it -> InternalLog.builder()
.line(it)
.container(container)
.build())
.collect(Collectors.toList());
final List<Log> filtered = filter(condition, logs);
final List<Log> limited =
filtered
.stream()
.limit(10000)
.collect(Collectors.toList());
final Logs result = new Logs();
result.getLogs().addAll(limited);
return result;
} catch (ApiException e) {
log.error("Failed to fetch logs from Kubernetes, {}", e.getResponseBody(), e);
Map<String, Object> responseBody = gson.fromJson(e.getResponseBody(), responseType);
String message = responseBody.getOrDefault("message", e.getCode()).toString();
throw new RuntimeException(message);
}
}
private List<Log> filter(
final OndemandLogQueryCondition request,
final List<InternalLog> logs) {
final Duration duration = new Duration();
duration.setStart(request.getDuration().getStart());
duration.setEnd(request.getDuration().getEnd());
duration.setStep(request.getDuration().getStep());
final long since = duration.getStartTimestamp() / 1000;
final long to = duration.getEndTimestamp() / 1000;
final List<String> inclusions = request.getKeywordsOfContent();
final Predicate<Log> inclusivePredicate = l -> inclusions.isEmpty() ||
inclusions.stream().anyMatch(k -> l.getContent().matches(k));
final List<String> exclusions = request.getExcludingKeywordsOfContent();
final Predicate<Log> exclusivePredicate = l -> exclusions.isEmpty() ||
exclusions.stream().noneMatch(k -> l.getContent().matches(k));
return logs.stream()
.map(LogAdapter::new).map(LogAdapter::adapt)
.filter(inclusivePredicate)
.filter(exclusivePredicate)
.filter(it -> it.getTimestamp() >= since)
.filter(it -> it.getTimestamp() <= to)
.sorted(comparing(Log::getTimestamp))
.collect(Collectors.toList());
}
private CoreV1Api kApi() throws IOException {
if (kApi == null) {
Configuration.setDefaultApiClient(
Config
.defaultClient()
.setReadTimeout(30000)
.setWriteTimeout(30000)
.setConnectTimeout(303000));
kApi = new CoreV1Api();
}
return kApi;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.skywalking.oap.query.graphql.type;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
@Data
@Builder
@AllArgsConstructor
@RequiredArgsConstructor
@Accessors(fluent = true)
public final class InternalLog {
private String line;
private String container;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.skywalking.oap.query.graphql.type;
import static java.lang.String.format;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.ResolverStyle;
import java.time.temporal.TemporalAccessor;
import java.util.List;
import com.google.common.base.Splitter;
import org.apache.skywalking.oap.server.core.query.type.ContentType;
import org.apache.skywalking.oap.server.core.query.type.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class LogAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(LogAdapter.class);
private final InternalLog log;
// k8s promises RFC3339 or RFC3339Nano timestamp, we truncate to RFC3339
private final DateTimeFormatter rfc3339Formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZZZZZ")
.withResolverStyle(ResolverStyle.LENIENT);
public Log adapt() {
Log l = new Log();
List<String> timeAndContent = Splitter.on(" ")
.limit(2)
.trimResults()
.splitToList(log.line());
if (timeAndContent.size() == 2) {
String timeStr = timeAndContent.get(0).replaceAll("\\.\\d+Z", "Z");
try {
TemporalAccessor t = rfc3339Formatter.parse(timeStr);
long timestamp = Instant.from(t).getEpochSecond();
l.setTimestamp(timestamp);
l.setContent(format("[%s] %s", log.container(), timeAndContent.get(1)));
l.setContentType(ContentType.TEXT);
} catch (Exception e) {
LOGGER.warn("Failed to parse log entry, {}", log, e);
}
}
return l;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.skywalking.oap.query.graphql.type;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class OndemandLogQueryCondition {
private String namespace;
private String container;
private String serviceId;
private String serviceInstanceId;
private Duration duration;
private List<String> keywordsOfContent = Collections.emptyList();
private List<String> excludingKeywordsOfContent = Collections.emptyList();
}
Subproject commit 3adb9f68649b1a03fa3fa664192ff4bd7523f18b
Subproject commit 6c461cc60a653de801e51ea914b78a243ce66819
......@@ -399,6 +399,10 @@ query:
maxQueryComplexity: ${SW_QUERY_MAX_QUERY_COMPLEXITY:1000}
# Allow user add, disable and update UI template
enableUpdateUITemplate: ${SW_ENABLE_UPDATE_UI_TEMPLATE:false}
# "On demand log" allows users to fetch Pod containers' log in real time,
# because this might expose secrets in the logs (if any), users need
# to enable this manually, and add permissions to OAP cluster role.
enableOnDemandPodLog: ${SW_ENABLE_ON_DEMAND_POD_LOG:false}
alarm:
selector: ${SW_ALARM:default}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册