KafkaLogE2E.java 6.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.skywalking.e2e.kafka;

import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.e2e.annotation.ContainerHostAndPort;
import org.apache.skywalking.e2e.annotation.DockerCompose;
import org.apache.skywalking.e2e.base.SkyWalkingE2E;
import org.apache.skywalking.e2e.base.SkyWalkingTestAdapter;
import org.apache.skywalking.e2e.common.HostAndPort;
import org.apache.skywalking.e2e.log.Log;
import org.apache.skywalking.e2e.log.LogsMatcher;
import org.apache.skywalking.e2e.log.LogsQuery;
import org.apache.skywalking.e2e.retryable.RetryableTest;
import org.apache.skywalking.e2e.service.Service;
import org.apache.skywalking.e2e.service.ServicesMatcher;
import org.apache.skywalking.e2e.service.ServicesQuery;
import org.apache.skywalking.e2e.service.endpoint.EndpointQuery;
import org.apache.skywalking.e2e.service.endpoint.Endpoints;
import org.apache.skywalking.e2e.service.endpoint.EndpointsMatcher;
import org.apache.skywalking.e2e.service.instance.Instances;
import org.apache.skywalking.e2e.service.instance.InstancesMatcher;
import org.apache.skywalking.e2e.service.instance.InstancesQuery;
import org.apache.skywalking.e2e.utils.Times;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.DockerComposeContainer;

import static org.apache.skywalking.e2e.utils.Yamls.load;

@Slf4j
@SkyWalkingE2E
public class KafkaLogE2E extends SkyWalkingTestAdapter {

    @SuppressWarnings("unused")
    @DockerCompose({
        "docker/kafka/docker-compose.yml",
        "docker/kafka/docker-compose.log.yml"
    })
    private DockerComposeContainer<?> justForSideEffects;

    @SuppressWarnings("unused")
    @ContainerHostAndPort(name = "oap", port = 12800)
    private HostAndPort swWebappHostPort;

    @SuppressWarnings("unused")
64
    @ContainerHostAndPort(name = "provider", port = 9090)
65 66 67 68 69
    private HostAndPort serviceHostPort;

    @BeforeAll
    public void setUp() throws Exception {
        queryClient(swWebappHostPort);
70
        trafficController(serviceHostPort, "/logs/trigger");
71
        trafficController(serviceHostPort, "/file/logs/trigger");
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    }

    @AfterAll
    public void tearDown() {
        if (trafficController != null) {
            trafficController.stop();
        }
    }

    @RetryableTest
    public void verifyService() throws Exception {
        List<Service> services = graphql.services(
            new ServicesQuery().start(startTime).end(Times.now()));
        services = services.stream().filter(s -> !s.getLabel().equals("oap::oap-server")).collect(Collectors.toList());
        LOGGER.info("services: {}", services);

        load("expected/log/services.yml").as(ServicesMatcher.class).verify(services);

        for (Service service : services) {
            LOGGER.info("verifying service instance: {}", service);
            // instance
            verifyServiceInstances(service);
        }
    }

    @RetryableTest
    public void verifyLog() throws Exception {
99
        LogsQuery logsQuery = new LogsQuery().serviceId("WW91cl9BcHBsaWNhdGlvbk5hbWU=.1")
100 101 102
                                             .start(startTime)
                                             .end(Times.now());
        if (graphql.supportQueryLogsByKeywords()) {
103
            logsQuery.keywordsOfContent("now");
104 105 106 107 108 109 110
        }
        final List<Log> logs = graphql.logs(logsQuery);
        LOGGER.info("logs: {}", logs);

        load("expected/log/logs.yml").as(LogsMatcher.class).verifyLoosely(logs);
    }

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    @RetryableTest
    public void verifyLogFromFilebeat() throws Exception {
        final String agent = "filebeat";
        verifyLogFrom(agent, "log4j fileLogger");
        verifyLogFrom(agent, "log4j2 fileLogger");
        verifyLogFrom(agent, "logback fileLogger");
    }
    
    @RetryableTest
    public void verifyLogFromFluentd() throws Exception {
        final String agent = "fluentd";
        verifyLogFrom(agent, "log4j fileLogger");
        verifyLogFrom(agent, "log4j2 fileLogger");
        verifyLogFrom(agent, "logback fileLogger");
    }
    
    private void verifyLogFrom(String agent, String keyword) throws Exception {
        LogsQuery logsQuery = new LogsQuery().serviceId("WW91cl9BcHBsaWNhdGlvbk5hbWU=.1")
                                            .addTag("agent", agent)
                                            .start(startTime)
                                            .end(Times.now());
        if (graphql.supportQueryLogsByKeywords()) {
            logsQuery.keywordsOfContent(keyword);
        }
        final List<Log> logs = graphql.logs(logsQuery);
        LOGGER.info("logs: {}", logs);
    
        load("expected/log/logs.yml").as(LogsMatcher.class).verifyLoosely(logs);
    }

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    private void verifyServiceInstances(final Service service) throws Exception {
        final Instances instances = graphql.instances(
            new InstancesQuery().serviceId(service.getKey()).start(startTime).end(Times.now()));

        LOGGER.info("instances: {}", instances);
        load("expected/log/instances.yml").as(InstancesMatcher.class).verify(instances);
    }

    private void verifyServiceEndpoints(final Service service) throws Exception {
        final Endpoints endpoints = graphql.endpoints(new EndpointQuery().serviceId(service.getKey()));
        LOGGER.info("endpoints: {}", endpoints);

        load("expected/log/endpoints.yml").as(EndpointsMatcher.class).verify(endpoints);
    }
}