提交 4e1bde9f 编写于 作者: M Matteo Merli 提交者: GitHub

In unit tests, do ZK cache reloads in same thread, to avoid race conditions (#31)

上级 afc771bd
......@@ -180,7 +180,7 @@ public class PulsarService implements AutoCloseable {
adminClient.close();
adminClient = null;
}
nsservice = null;
// executor is not initialized in mocks even when real close method is called
......@@ -357,10 +357,10 @@ public class PulsarService implements AutoCloseable {
LOG.info("starting configuration cache service");
this.localZkCache = new LocalZooKeeperCache(getZkClient(), this.orderedExecutor);
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor());
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
this.orderedExecutor, this.executor);
getOrderedExecutor(), this.executor);
try {
this.globalZkCache.start();
} catch (IOException e) {
......
......@@ -68,6 +68,8 @@ public abstract class MockedPulsarServiceBaseTest {
protected MockZooKeeper mockZookKeeper;
protected NonClosableMockBookKeeper mockBookKeeper;
private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
public MockedPulsarServiceBaseTest() {
this.conf = new ServiceConfiguration();
this.conf.setBrokerServicePort(BROKER_PORT);
......@@ -96,6 +98,8 @@ public abstract class MockedPulsarServiceBaseTest {
mockZookKeeper = createMockZooKeeper();
mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), mockZookKeeper);
sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
startBroker();
brokerUrl = new URL("http://localhost:" + BROKER_WEBSERVICE_PORT);
......@@ -110,6 +114,7 @@ public abstract class MockedPulsarServiceBaseTest {
pulsar.close();
mockBookKeeper.reallyShutdow();
mockZookKeeper.shutdown();
sameThreadOrderedSafeExecutor.shutdown();
}
protected abstract void setup() throws Exception;
......@@ -146,6 +151,8 @@ public abstract class MockedPulsarServiceBaseTest {
Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor();
}
private MockZooKeeper createMockZooKeeper() throws Exception {
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.broker.auth;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
public class SameThreadOrderedSafeExecutor extends OrderedSafeExecutor {
public SameThreadOrderedSafeExecutor() {
super(1, "ordered-executor");
}
@Override
public void submit(SafeRunnable r) {
r.run();
}
@Override
public void submitOrdered(int orderingKey, SafeRunnable r) {
r.run();
}
@Override
public void submitOrdered(long orderingKey, SafeRunnable r) {
r.run();
}
@Override
public void submitOrdered(Object orderingKey, SafeRunnable r) {
r.run();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册