PulsarConnectorCache.java 7.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.sql.presto;

21 22 23
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
24 25
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
26 27
import java.io.IOException;
import java.util.Map;
28 29 30 31 32
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
33 34
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
35 36 37 38 39 40 41
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;

42 43 44
/**
 * Implementation of a cache for the Pulsar connector.
 */
45 46
public class PulsarConnectorCache {

47 48
    private static final Logger log = Logger.get(PulsarConnectorCache.class);

49 50
    @VisibleForTesting
    static PulsarConnectorCache instance;
51 52 53

    private final ManagedLedgerFactory managedLedgerFactory;

54
    private final StatsProvider statsProvider;
55 56 57 58 59 60 61 62
    private OrderedScheduler offloaderScheduler;
    private Offloaders offloaderManager;
    private LedgerOffloader offloader;

    private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
    private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
    private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";

63

64 65
    private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
66 67 68 69 70 71
        this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
                StatsProvider.class, getClass().getClassLoader());

        // start stats provider
        ClientConfiguration clientConfiguration = new ClientConfiguration();

72
        pulsarConnectorConfig.getStatsProviderConfigs().forEach(clientConfiguration::setProperty);
73 74

        this.statsProvider.start(clientConfiguration);
75 76

        this.offloader = initManagedLedgerOffloader(pulsarConnectorConfig);
77 78 79 80 81 82 83 84 85 86 87
    }

    public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
        synchronized (PulsarConnectorCache.class) {
            if (instance == null) {
                instance = new PulsarConnectorCache(pulsarConnectorConfig);
            }
        }
        return instance;
    }

88 89
    private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
        throws Exception {
90 91
        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
92 93
                .setClientTcpNoDelay(false)
                .setUseV2WireProtocol(true)
94
                .setStickyReadsEnabled(false)
95 96 97 98
                .setReadEntryTimeout(60);
        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
    }

99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    public ManagedLedgerConfig getManagedLedgerConfig() {

        return new ManagedLedgerConfig()
                .setLedgerOffloader(this.offloader);
    }

    private synchronized OrderedScheduler getOffloaderScheduler(PulsarConnectorConfig pulsarConnectorConfig) {
        if (this.offloaderScheduler == null) {
            this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
                    .numThreads(pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads())
                    .name("pulsar-offloader").build();
        }
        return this.offloaderScheduler;
    }

    private LedgerOffloader initManagedLedgerOffloader(PulsarConnectorConfig conf) {

        try {
            if (StringUtils.isNotBlank(conf.getManagedLedgerOffloadDriver())) {
                checkNotNull(conf.getOffloadersDirectory(),
                        "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                        conf.getManagedLedgerOffloadDriver());
                this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory());
                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
                        conf.getManagedLedgerOffloadDriver());

                Map<String, String> offloaderProperties = conf.getOffloaderProperties();
                offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory());
                offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver());
128 129
                offloaderProperties
                    .put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
130 131 132

                try {
                    return offloaderFactory.create(
133 134 135 136 137 138
                        PulsarConnectorUtils.getProperties(offloaderProperties),
                        ImmutableMap.of(
                            LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
                            LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
                        ),
                        getOffloaderScheduler(conf));
139 140 141 142 143 144 145 146 147 148 149 150 151
                } catch (IOException ioe) {
                    log.error("Failed to create offloader: ", ioe);
                    throw new RuntimeException(ioe.getMessage(), ioe.getCause());
                }
            } else {
                log.info("No ledger offloader configured, using NULL instance");
                return NullLedgerOffloader.INSTANCE;
            }
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

152 153 154 155
    public ManagedLedgerFactory getManagedLedgerFactory() {
        return managedLedgerFactory;
    }

156 157 158 159
    public StatsProvider getStatsProvider() {
        return statsProvider;
    }

160
    public static void shutdown() throws Exception {
161 162 163
        synchronized (PulsarConnectorCache.class) {
            if (instance != null) {
                instance.statsProvider.stop();
164 165 166
                instance.managedLedgerFactory.shutdown();
                instance.offloaderScheduler.shutdown();
                instance.offloaderManager.close();
167 168
                instance = null;
            }
169 170 171
        }
    }
}