提交 d5ebcb67 编写于 作者: R Rajan 提交者: rdhabalia

create dynamic-config znode if not present (#327)

上级 cde3d231
......@@ -43,6 +43,7 @@ import java.util.function.Consumer;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
......@@ -50,12 +51,15 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
......@@ -956,6 +960,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private void updateDynamicServiceConfiguration() {
try {
// create dynamic-config znode if not present
if (pulsar.getZkClient().exists(BROKER_SERVICE_CONFIGURATION_PATH, false) == null) {
try {
byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(Maps.newHashMap());
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), BROKER_SERVICE_CONFIGURATION_PATH, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
// Ok
}
}
Optional<Map<String, String>> data = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
if (data.isPresent() && data.get() != null) {
data.get().forEach((key,value)-> {
......
......@@ -400,19 +400,18 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
*/
@Test
public void testUpdateDynamicConfigurationWithZkWatch() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
Method updateConfigListenerMethod = BrokerService.class
.getDeclaredMethod("updateConfigurationAndRegisterListeners");
updateConfigListenerMethod.setAccessible(true);
updateConfigListenerMethod.invoke(pulsar.getBrokerService());
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
final int initValue = 30000;
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue);
// (1) try to update dynamic field
final long shutdownTime = 10;
// update configuration
admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
// sleep incrementally as zk-watch notification is async and may take some time
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != initValue) {
Thread.sleep(50 + (i * 10));
}
}
// wait config to be updated
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != shutdownTime) {
......@@ -443,9 +442,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
/**
* <pre>
* verifies: that registerListener updates pulsar.config value with newly updated zk-dynamic config
* NOTE: pulsar can't set the watch on non-existing znode
* So, when pulsar starts it is not able to set the watch on non-existing znode of dynamicConfiguration
* So, here, after creating znode we will trigger register explicitly
* 1.start pulsar
* 2.update zk-config with admin api
* 3. trigger watch and listener
......@@ -456,14 +452,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public void testUpdateDynamicLocalConfiguration() throws Exception {
// (1) try to update dynamic field
final long initValue = 30000;
final long shutdownTime = 10;
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue);
// update configuration
admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
// Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
Method getPermitZkNodeMethod = BrokerService.class.getDeclaredMethod("updateConfigurationAndRegisterListeners");
getPermitZkNodeMethod.setAccessible(true);
getPermitZkNodeMethod.invoke(pulsar.getBrokerService());
// sleep incrementally as zk-watch notification is async and may take some time
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != initValue) {
Thread.sleep(50 + (i * 10));
}
}
// verify value is updated
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
}
......@@ -481,12 +480,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
final String configName = "brokerShutdownTimeoutMs";
final long shutdownTime = 10;
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
try {
admin.brokers().getAllDynamicConfigurations();
fail("should have fail as configuration is not exist");
} catch (PulsarAdminException.NotFoundException ne) {
// ok : expected
}
Map<String, String> configs = admin.brokers().getAllDynamicConfigurations();
assertTrue(configs.isEmpty());
assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
// update configuration
admin.brokers().updateDynamicConfiguration(configName, Long.toString(shutdownTime));
......
......@@ -165,7 +165,7 @@ public abstract class MockedPulsarServiceBaseTest {
doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor();
}
private MockZooKeeper createMockZooKeeper() throws Exception {
public static MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.sameThreadExecutor());
List<ACL> dummyAclList = new ArrayList<ACL>(0);
......
......@@ -69,12 +69,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
*/
@Test
public void testThrottlingLookupRequestSemaphore() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
BrokerService service = pulsar.getBrokerService();
assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0));
......@@ -91,12 +85,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
@Test
public void testLookupThrottlingForClientByBroker0Permit() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
final String topicName = "persistent://prop/usw/my-ns/newTopic";
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
......@@ -140,13 +128,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
*/
@Test
public void testLookupThrottlingForClientByBroker() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
final String topicName = "persistent://prop/usw/my-ns/newTopic";
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
......@@ -211,12 +192,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
@Test
public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception {
// create configuration znode
ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, "{}".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode();
final String topicName = "persistent://prop/usw/my-ns/newTopic";
com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
......@@ -293,11 +268,4 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
}
}
private void setWatchOnThrottlingZnode() throws Exception {
Method updateConfigListenerMethod = BrokerService.class
.getDeclaredMethod("updateConfigurationAndRegisterListeners");
updateConfigListenerMethod.setAccessible(true);
updateConfigListenerMethod.invoke(pulsar.getBrokerService());
}
}
\ No newline at end of file
......@@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.service;
import static com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.matches;
......@@ -91,7 +92,7 @@ public class PersistentDispatcherFailoverConsumerTest {
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
ZooKeeper mockZk = mock(ZooKeeper.class);
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
configCacheService = mock(ConfigurationCacheService.class);
......
......@@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.service;
import static com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
......@@ -84,7 +85,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
mlFactoryMock = factory;
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
ZooKeeper mockZk = mock(ZooKeeper.class);
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
brokerService = spy(new BrokerService(pulsar));
......
......@@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.service;
import static com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
......@@ -130,7 +131,7 @@ public class PersistentTopicTest {
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
ZooKeeper mockZk = mock(ZooKeeper.class);
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
configCacheService = mock(ConfigurationCacheService.class);
......
......@@ -50,7 +50,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
......@@ -95,6 +94,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import static com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
/**
*/
......@@ -135,7 +135,7 @@ public class ServerCnxTest {
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
ZooKeeper mockZk = mock(ZooKeeper.class);
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
configCacheService = mock(ConfigurationCacheService.class);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册