KafkaProducerManager.java 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.apm.agent.core.kafka;

21
import com.google.gson.Gson;
22 23 24
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
25
import java.util.Map;
26 27 28 29
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
30 31
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
32 33
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
34
import java.util.stream.Collectors;
35

36 37 38 39 40 41 42 43 44
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
45 46
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
47
import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig.Plugin.Kafka;
48 49
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
50
import org.apache.skywalking.apm.agent.core.plugin.loader.AgentClassLoader;
51 52 53
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
54 55 56 57 58 59 60

/**
 * Configuring, initializing and holding a KafkaProducer instance for reporters.
 */
@DefaultImplementor
public class KafkaProducerManager implements BootService, Runnable {

61 62
    private static final ILog LOGGER = LogManager.getLogger(KafkaProducerManager.class);

63 64
    private final Set<String> topics = new HashSet<>();
    private final List<KafkaConnectionStatusListener> listeners = new ArrayList<>();
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84

    private volatile KafkaProducer<String, Bytes> producer;

    private ScheduledFuture<?> bootProducerFuture;

    @Override
    public void prepare() {
    }

    @Override
    public void boot() {
        bootProducerFuture = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("kafkaProducerInitThread")
        ).scheduleAtFixedRate(new RunnableWithExceptionProtection(
                this,
                t -> LOGGER.error("unexpected exception.", t)
        ), 0, 120, TimeUnit.SECONDS);
    }

    String formatTopicNameThenRegister(String topic) {
85 86
        String topicName = StringUtil.isBlank(Kafka.NAMESPACE) ? topic
                : Kafka.NAMESPACE + "-" + topic;
87 88 89 90 91 92 93 94 95 96 97 98 99
        topics.add(topicName);
        return topicName;
    }

    public void addListener(KafkaConnectionStatusListener listener) {
        if (!listeners.contains(listener)) {
            listeners.add(listener);
        }
    }

    @Override
    public void onComplete() {
    }
100 101

    @Override
102
    public void run() {
103 104
        Thread.currentThread().setContextClassLoader(AgentClassLoader.getDefault());

105
        Properties properties = new Properties();
106 107 108 109 110 111 112 113
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS);

        if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
            Gson gson = new Gson();
            Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
            config.forEach(properties::setProperty);
        }
        Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
114

D
Daming 已提交
115 116 117 118 119 120
        try (AdminClient adminClient = AdminClient.create(properties)) {
            DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
            Set<String> topics = topicsResult.values().entrySet().stream()
                    .map(entry -> {
                        try {
                            entry.getValue().get(
121
                                    Kafka.GET_TOPIC_TIMEOUT,
D
Daming 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
                                    TimeUnit.SECONDS
                            );
                            return null;
                        } catch (InterruptedException | ExecutionException | TimeoutException e) {
                            LOGGER.error(e, "Get KAFKA topic:{} error.", entry.getKey());
                        }
                        return entry.getKey();
                    })
                    .filter(Objects::nonNull)
                    .collect(Collectors.toSet());
    
            if (!topics.isEmpty()) {
                LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
                return;
            }
    
            try {
                producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
            } catch (Exception e) {
141
                LOGGER.error(e, "connect to kafka cluster '{}' failed", Kafka.BOOTSTRAP_SERVERS);
D
Daming 已提交
142 143 144 145 146
                return;
            }
            //notify listeners to send data if no exception been throw
            notifyListeners(KafkaConnectionStatus.CONNECTED);
            bootProducerFuture.cancel(true);
147
        }
148 149
    }

150 151 152 153
    private void notifyListeners(KafkaConnectionStatus status) {
        for (KafkaConnectionStatusListener listener : listeners) {
            listener.onStatusChanged(status);
        }
154 155 156 157
    }

    /**
     * Get the KafkaProducer instance to send data to Kafka broker.
158
     * @return Kafka producer
159 160 161 162 163
     */
    public final KafkaProducer<String, Bytes> getProducer() {
        return producer;
    }

164 165 166 167 168 169 170 171 172 173
    /**
     * make kafka producer init later but before {@link GRPCChannelManager}
     *
     * @return priority value
     */
    @Override
    public int priority() {
        return ServiceManager.INSTANCE.findService(GRPCChannelManager.class).priority() - 1;
    }

174 175 176 177 178
    @Override
    public void shutdown() {
        producer.flush();
        producer.close();
    }
D
Darcy 已提交
179
}