提交 f14a61a4 编写于 作者: S Sijie Guo 提交者: Jia Zhai

[test] add getters and setters to PulsarService & BrokerService (#4709)

*Motivation*

When using PulsarService or BrokerService for testing, it might require accessing
the components in PulsarService and BrokerService. This change is adding setters
and getters to access the components in PulsarService & BrokerService
(cherry picked from commit 5cff1691)
上级 34b3a947
......@@ -46,6 +46,9 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
......@@ -124,6 +127,8 @@ import org.slf4j.LoggerFactory;
* Main class for Pulsar broker service
*/
@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private ServiceConfiguration config = null;
......@@ -351,7 +356,7 @@ public class PulsarService implements AutoCloseable {
if (!config.getBrokerServicePort().isPresent() && !config.getBrokerServicePortTls().isPresent()) {
throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present");
}
// Now we are ready to start services
localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
......@@ -459,7 +464,7 @@ public class PulsarService implements AutoCloseable {
+ (config.getBrokerServicePort().isPresent() ? "broker url= " + brokerServiceUrl : "")
+ (config.getBrokerServicePortTls().isPresent() ? "broker url= " + brokerServiceUrlTls : "");
LOG.info("messaging service is ready");
LOG.info("messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
config.getClusterName(), ReflectionToStringBuilder.toString(config));
} catch (Exception e) {
......@@ -470,7 +475,7 @@ public class PulsarService implements AutoCloseable {
}
}
private void startLeaderElectionService() {
protected void startLeaderElectionService() {
this.leaderElectionService = new LeaderElectionService(this, new LeaderListener() {
@Override
public synchronized void brokerIsTheLeaderNow() {
......@@ -504,7 +509,7 @@ public class PulsarService implements AutoCloseable {
leaderElectionService.start();
}
private void acquireSLANamespace() {
protected void acquireSLANamespace() {
try {
// Namespace not created hence no need to unload it
String nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config);
......@@ -553,7 +558,7 @@ public class PulsarService implements AutoCloseable {
}
}
private void startZkCacheService() throws PulsarServerException {
protected void startZkCacheService() throws PulsarServerException {
LOG.info("starting configuration cache service");
......@@ -573,7 +578,7 @@ public class PulsarService implements AutoCloseable {
this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
}
private void startNamespaceService() throws PulsarServerException {
protected void startNamespaceService() throws PulsarServerException {
LOG.info("Starting name space service, bootstrap namespaces=" + config.getBootstrapNamespaces());
......@@ -584,7 +589,7 @@ public class PulsarService implements AutoCloseable {
return () -> new NamespaceService(PulsarService.this);
}
private void startLoadManagementService() throws PulsarServerException {
protected void startLoadManagementService() throws PulsarServerException {
LOG.info("Starting load management service ...");
this.loadManager.get().start();
......@@ -708,7 +713,7 @@ public class PulsarService implements AutoCloseable {
public ManagedLedgerClientFactory getManagedLedgerClientFactory() {
return managedLedgerClientFactory;
}
public LedgerOffloader getManagedLedgerOffloader() {
return offloader;
}
......@@ -851,7 +856,7 @@ public class PulsarService implements AutoCloseable {
builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath());
builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection());
}
// most of the admin request requires to make zk-call so, keep the max read-timeout based on
// zk-operation timeout
builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
......@@ -931,49 +936,14 @@ public class PulsarService implements AutoCloseable {
return String.format("https://%s:%d", host, port);
}
public String getBindAddress() {
return bindAddress;
}
public String getAdvertisedAddress() {
return advertisedAddress;
}
public String getSafeWebServiceAddress() {
return webServiceAddress != null ? webServiceAddress : webServiceAddressTls;
}
public String getWebServiceAddress() {
return webServiceAddress;
}
public String getWebServiceAddressTls() {
return webServiceAddressTls;
}
public String getSafeBrokerServiceUrl() {
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
}
public String getBrokerServiceUrl() {
return brokerServiceUrl;
}
public String getBrokerServiceUrlTls() {
return brokerServiceUrlTls;
}
public AtomicReference<LoadManager> getLoadManager() {
return loadManager;
}
public String getBrokerVersion() {
return brokerVersion;
}
public SchemaRegistryService getSchemaRegistryService() {
return schemaRegistryService;
}
private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
......
......@@ -63,6 +63,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
......@@ -132,10 +135,11 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PROTECTED)
public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> {
private static final Logger log = LoggerFactory.getLogger(BrokerService.class);
......@@ -174,7 +178,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private DistributedIdGenerator producerNameGenerator;
private final static String producerNameGeneratorPath = "/counters/producer-name";
public final static String producerNameGeneratorPath = "/counters/producer-name";
private final BacklogQuotaManager backlogQuotaManager;
......@@ -334,7 +338,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
}
void startStatsUpdater(int statsUpdateInitailDelayInSecs, int statsUpdateFrequencyInSecs) {
protected void startStatsUpdater(int statsUpdateInitailDelayInSecs, int statsUpdateFrequencyInSecs) {
statsUpdater.scheduleAtFixedRate(safeRun(this::updateRates),
statsUpdateInitailDelayInSecs, statsUpdateFrequencyInSecs, TimeUnit.SECONDS);
......@@ -342,7 +346,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
updateRates();
}
void startInactivityMonitor() {
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
int interval = pulsar().getConfiguration().getBrokerServicePurgeInactiveFrequencyInSeconds();
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(interval)), interval, interval,
......@@ -364,13 +368,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
void startMessageExpiryMonitor() {
protected void startMessageExpiryMonitor() {
int interval = pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkMessageExpiry), interval, interval,
TimeUnit.MINUTES);
}
void startCompactionMonitor() {
protected void startCompactionMonitor() {
int interval = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
if (interval > 0) {
compactionMonitor.scheduleAtFixedRate(safeRun(() -> checkCompaction()),
......@@ -378,7 +382,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
void startBacklogQuotaChecker() {
protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
log.info("Scheduling a thread to check backlog quota after [{}] seconds in background", interval);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册