提交 524ff06e 编写于 作者: R Rajan Dhabalia 提交者: Matteo Merli

Fix: register zk-stats listener after broker service started (#573)

上级 3047d1c2
......@@ -72,6 +72,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
......@@ -165,6 +167,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final int keepAliveIntervalSeconds;
private final PulsarStats pulsarStats;
private final EventListner zkStatsListener;
private final AuthenticationService authenticationService;
public static final String BROKER_SERVICE_CONFIGURATION_PATH = "/admin/configuration";
......@@ -190,10 +193,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>();
this.pulsarStats = new PulsarStats(pulsar);
// register listener to capture zk-latency
ClientCnxnAspect.addListener((eventType, latencyMs) -> {
this.pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
});
this.offlineTopicStatCache = new ConcurrentOpenHashMap<>();
final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");
......@@ -265,8 +264,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
"Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ",
pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked());
}
// register listener to capture zk-latency
zkStatsListener = new EventListner() {
@Override
public void recordLatency(EventType eventType, long latencyMs) {
pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
}
};
PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}
......@@ -307,6 +312,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startBacklogQuotaChecker();
// register listener to capture zk-latency
ClientCnxnAspect.addListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
}
void startStatsUpdater() {
......@@ -370,6 +378,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
backlogQuotaChecker.shutdown();
authenticationService.close();
pulsarStats.close();
ClientCnxnAspect.removeListener(zkStatsListener);
ClientCnxnAspect.registerExecutor(null);
log.info("Broker service completely shut down");
}
......
......@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.zookeeper.aspectj;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.jute.Record;
......@@ -54,6 +55,8 @@ public class ClientCnxnAspect {
public static enum EventType {
write, read, other;
}
private static ExecutorService eventProcessExecutor;
public static interface EventListner {
public void recordLatency(EventType eventType, long latencyMiliSecond);
......@@ -68,7 +71,19 @@ public class ClientCnxnAspect {
@Around("processEvent()")
public void timedProcessEvent(ProceedingJoinPoint joinPoint) throws Throwable {
joinPoint.proceed();
// zkResponse event shouldn't be blocked and it should be processed
// async
if (eventProcessExecutor != null && !eventProcessExecutor.isShutdown()) {
eventProcessExecutor.submit(new Runnable() {
@Override
public void run() {
processEvent(joinPoint);
}
});
}
}
private void processEvent(ProceedingJoinPoint joinPoint) {
long startTimeMs = getStartTime(joinPoint.getArgs()[0]);
if (startTimeMs == -1) {
// couldn't find start time
......@@ -139,7 +154,8 @@ public class ClientCnxnAspect {
Field ctxField = Class.forName("org.apache.zookeeper.ClientCnxn$Packet").getDeclaredField("ctx");
ctxField.setAccessible(true);
Object zooworker = ctxField.get(packet);
if (zooworker.getClass().getName().equals("org.apache.bookkeeper.zookeeper.ZooWorker")) {
if (zooworker != null
&& zooworker.getClass().getName().equals("org.apache.bookkeeper.zookeeper.ZooWorker")) {
Field timeField = Class.forName("org.apache.bookkeeper.zookeeper.ZooWorker")
.getDeclaredField("startTimeMs");
timeField.setAccessible(true);
......@@ -172,10 +188,18 @@ public class ClientCnxnAspect {
return null;
}
public static void registerExecutor(ExecutorService executor) {
eventProcessExecutor = executor;
}
public static void addListener(EventListner listener) {
listeners.add(listener);
}
public static void removeListener(EventListner listener) {
listeners.remove(listener);
}
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnAspect.class);
}
}
\ No newline at end of file
......@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.stats.Metrics;
......@@ -52,7 +53,6 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.aspectj.weaver.loadtime.Agent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
......@@ -60,7 +60,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.ea.agentloader.AgentLoader;
import com.google.common.util.concurrent.AtomicDouble;
public class ZooKeeperClientAspectJTest {
......@@ -74,11 +73,11 @@ public class ZooKeeperClientAspectJTest {
static {
// load agent with aspectjweaver-Agent for testing
// maven-test waves advice on build-goal so, maven doesn't need explicit loading
// uncomment it while testing on eclipse:
//AgentLoader.loadAgentClass(Agent.class.getName(), null);
// uncomment it while testing on eclipse:
// AgentLoader.loadAgentClass(Agent.class.getName(), null);
}
@Test(enabled=false)
@Test
public void testZkConnected() throws Exception {
try {
ZooKeeperClientFactory zkf = new ZookeeperBkClientFactoryImpl();
......@@ -122,13 +121,17 @@ public class ZooKeeperClientAspectJTest {
final AtomicInteger writeCount = new AtomicInteger(0);
final AtomicInteger readCount = new AtomicInteger(0);
ClientCnxnAspect.addListener((EventType eventType, long latencyMiliSecond) -> {
if (eventType.equals(EventType.write)) {
writeCount.incrementAndGet();
} else if (eventType.equals(EventType.read)) {
readCount.incrementAndGet();
EventListner listener = new EventListner() {
@Override
public void recordLatency(EventType eventType, long latencyMiliSecond) {
if (eventType.equals(EventType.write)) {
writeCount.incrementAndGet();
} else if (eventType.equals(EventType.read)) {
readCount.incrementAndGet();
}
}
});
};
ClientCnxnAspect.addListener(listener);
CountDownLatch createLatch = new CountDownLatch(1);
CountDownLatch deleteLatch = new CountDownLatch(1);
CountDownLatch readLatch = new CountDownLatch(1);
......@@ -152,6 +155,7 @@ public class ZooKeeperClientAspectJTest {
Thread.sleep(500);
Assert.assertEquals(writeCount.get(), 2);
Assert.assertEquals(readCount.get(), 2);
ClientCnxnAspect.removeListener(listener);
} finally {
if (localZkc != null) {
localZkc.close();
......@@ -219,7 +223,7 @@ public class ZooKeeperClientAspectJTest {
deleteLatch.await();
existLatch.await();
readLatch.await();
Thread.sleep(500);
Thread.sleep(10);
BrokerService brokerService = pulsar.getBrokerService();
brokerService.updateRates();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册