Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
378217d4
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
378217d4
编写于
4月 03, 2017
作者:
R
Rajan
提交者:
Matteo Merli
4月 03, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
create dynamic-config znode if not present (#327)
上级
8c270c94
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
41 addition
and
61 deletion
+41
-61
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
...n/java/com/yahoo/pulsar/broker/service/BrokerService.java
+14
-0
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
...test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
+18
-23
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
...yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+1
-1
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceThrottlingTest.java
...oo/pulsar/broker/service/BrokerServiceThrottlingTest.java
+0
-32
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
...ker/service/PersistentDispatcherFailoverConsumerTest.java
+2
-1
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java
.../pulsar/broker/service/PersistentTopicConcurrentTest.java
+2
-1
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java
.../com/yahoo/pulsar/broker/service/PersistentTopicTest.java
+2
-1
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java
...t/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java
+2
-2
未找到文件。
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/BrokerService.java
浏览文件 @
378217d4
...
...
@@ -43,6 +43,7 @@ import java.util.function.Consumer;
import
com.yahoo.pulsar.broker.loadbalance.LoadManager
;
import
org.apache.bookkeeper.client.BookKeeper.DigestType
;
import
org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback
;
import
org.apache.bookkeeper.util.ZkUtils
;
import
org.apache.bookkeeper.mledger.ManagedLedger
;
import
org.apache.bookkeeper.mledger.ManagedLedgerConfig
;
import
org.apache.bookkeeper.mledger.ManagedLedgerException
;
...
...
@@ -50,12 +51,15 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import
org.apache.commons.lang.SystemUtils
;
import
org.apache.commons.lang3.tuple.ImmutablePair
;
import
org.apache.commons.lang3.tuple.Pair
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.KeeperException
;
import
org.apache.zookeeper.ZooDefs.Ids
;
import
org.apache.zookeeper.data.Stat
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.google.common.collect.Lists
;
import
com.google.common.collect.Maps
;
import
com.google.common.collect.Queues
;
import
com.yahoo.pulsar.broker.PulsarService
;
import
com.yahoo.pulsar.broker.ServiceConfiguration
;
...
...
@@ -956,6 +960,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private
void
updateDynamicServiceConfiguration
()
{
try
{
// create dynamic-config znode if not present
if
(
pulsar
.
getZkClient
().
exists
(
BROKER_SERVICE_CONFIGURATION_PATH
,
false
)
==
null
)
{
try
{
byte
[]
data
=
ObjectMapperFactory
.
getThreadLocal
().
writeValueAsBytes
(
Maps
.
newHashMap
());
ZkUtils
.
createFullPathOptimistic
(
pulsar
.
getZkClient
(),
BROKER_SERVICE_CONFIGURATION_PATH
,
data
,
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
}
catch
(
KeeperException
.
NodeExistsException
e
)
{
// Ok
}
}
Optional
<
Map
<
String
,
String
>>
data
=
dynamicConfigurationCache
.
get
(
BROKER_SERVICE_CONFIGURATION_PATH
);
if
(
data
.
isPresent
()
&&
data
.
get
()
!=
null
)
{
data
.
get
().
forEach
((
key
,
value
)->
{
...
...
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/admin/AdminApiTest.java
浏览文件 @
378217d4
...
...
@@ -400,19 +400,18 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
*/
@Test
public
void
testUpdateDynamicConfigurationWithZkWatch
()
throws
Exception
{
// create configuration znode
ZkUtils
.
createFullPathOptimistic
(
mockZookKeeper
,
BROKER_SERVICE_CONFIGURATION_PATH
,
"{}"
.
getBytes
(),
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
// Now, znode is created: set the watch and listener on the znode
Method
updateConfigListenerMethod
=
BrokerService
.
class
.
getDeclaredMethod
(
"updateConfigurationAndRegisterListeners"
);
updateConfigListenerMethod
.
setAccessible
(
true
);
updateConfigListenerMethod
.
invoke
(
pulsar
.
getBrokerService
());
pulsar
.
getConfiguration
().
setBrokerShutdownTimeoutMs
(
30000
);
final
int
initValue
=
30000
;
pulsar
.
getConfiguration
().
setBrokerShutdownTimeoutMs
(
initValue
);
// (1) try to update dynamic field
final
long
shutdownTime
=
10
;
// update configuration
admin
.
brokers
().
updateDynamicConfiguration
(
"brokerShutdownTimeoutMs"
,
Long
.
toString
(
shutdownTime
));
// sleep incrementally as zk-watch notification is async and may take some time
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
if
(
pulsar
.
getConfiguration
().
getBrokerShutdownTimeoutMs
()
!=
initValue
)
{
Thread
.
sleep
(
50
+
(
i
*
10
));
}
}
// wait config to be updated
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
if
(
pulsar
.
getConfiguration
().
getBrokerShutdownTimeoutMs
()
!=
shutdownTime
)
{
...
...
@@ -443,9 +442,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
/**
* <pre>
* verifies: that registerListener updates pulsar.config value with newly updated zk-dynamic config
* NOTE: pulsar can't set the watch on non-existing znode
* So, when pulsar starts it is not able to set the watch on non-existing znode of dynamicConfiguration
* So, here, after creating znode we will trigger register explicitly
* 1.start pulsar
* 2.update zk-config with admin api
* 3. trigger watch and listener
...
...
@@ -456,14 +452,17 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public
void
testUpdateDynamicLocalConfiguration
()
throws
Exception
{
// (1) try to update dynamic field
final
long
initValue
=
30000
;
final
long
shutdownTime
=
10
;
pulsar
.
getConfiguration
().
setBrokerShutdownTimeoutMs
(
30000
);
pulsar
.
getConfiguration
().
setBrokerShutdownTimeoutMs
(
initValue
);
// update configuration
admin
.
brokers
().
updateDynamicConfiguration
(
"brokerShutdownTimeoutMs"
,
Long
.
toString
(
shutdownTime
));
// Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
Method
getPermitZkNodeMethod
=
BrokerService
.
class
.
getDeclaredMethod
(
"updateConfigurationAndRegisterListeners"
);
getPermitZkNodeMethod
.
setAccessible
(
true
);
getPermitZkNodeMethod
.
invoke
(
pulsar
.
getBrokerService
());
// sleep incrementally as zk-watch notification is async and may take some time
for
(
int
i
=
0
;
i
<
5
;
i
++)
{
if
(
pulsar
.
getConfiguration
().
getBrokerShutdownTimeoutMs
()
!=
initValue
)
{
Thread
.
sleep
(
50
+
(
i
*
10
));
}
}
// verify value is updated
assertEquals
(
pulsar
.
getConfiguration
().
getBrokerShutdownTimeoutMs
(),
shutdownTime
);
}
...
...
@@ -481,12 +480,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
final
String
configName
=
"brokerShutdownTimeoutMs"
;
final
long
shutdownTime
=
10
;
pulsar
.
getConfiguration
().
setBrokerShutdownTimeoutMs
(
30000
);
try
{
admin
.
brokers
().
getAllDynamicConfigurations
();
fail
(
"should have fail as configuration is not exist"
);
}
catch
(
PulsarAdminException
.
NotFoundException
ne
)
{
// ok : expected
}
Map
<
String
,
String
>
configs
=
admin
.
brokers
().
getAllDynamicConfigurations
();
assertTrue
(
configs
.
isEmpty
());
assertNotEquals
(
pulsar
.
getConfiguration
().
getBrokerShutdownTimeoutMs
(),
shutdownTime
);
// update configuration
admin
.
brokers
().
updateDynamicConfiguration
(
configName
,
Long
.
toString
(
shutdownTime
));
...
...
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
浏览文件 @
378217d4
...
...
@@ -165,7 +165,7 @@ public abstract class MockedPulsarServiceBaseTest {
doReturn
(
sameThreadOrderedSafeExecutor
).
when
(
pulsar
).
getOrderedExecutor
();
}
p
rivate
MockZooKeeper
createMockZooKeeper
()
throws
Exception
{
p
ublic
static
MockZooKeeper
createMockZooKeeper
()
throws
Exception
{
MockZooKeeper
zk
=
MockZooKeeper
.
newInstance
(
MoreExecutors
.
sameThreadExecutor
());
List
<
ACL
>
dummyAclList
=
new
ArrayList
<
ACL
>(
0
);
...
...
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceThrottlingTest.java
浏览文件 @
378217d4
...
...
@@ -69,12 +69,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
*/
@Test
public
void
testThrottlingLookupRequestSemaphore
()
throws
Exception
{
// create configuration znode
ZkUtils
.
createFullPathOptimistic
(
mockZookKeeper
,
BROKER_SERVICE_CONFIGURATION_PATH
,
"{}"
.
getBytes
(),
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode
();
BrokerService
service
=
pulsar
.
getBrokerService
();
assertNotEquals
(
service
.
lookupRequestSemaphore
.
get
().
availablePermits
(),
0
);
admin
.
brokers
().
updateDynamicConfiguration
(
"maxConcurrentLookupRequest"
,
Integer
.
toString
(
0
));
...
...
@@ -91,12 +85,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
@Test
public
void
testLookupThrottlingForClientByBroker0Permit
()
throws
Exception
{
// create configuration znode
ZkUtils
.
createFullPathOptimistic
(
mockZookKeeper
,
BROKER_SERVICE_CONFIGURATION_PATH
,
"{}"
.
getBytes
(),
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode
();
final
String
topicName
=
"persistent://prop/usw/my-ns/newTopic"
;
com
.
yahoo
.
pulsar
.
client
.
api
.
ClientConfiguration
clientConf
=
new
com
.
yahoo
.
pulsar
.
client
.
api
.
ClientConfiguration
();
...
...
@@ -140,13 +128,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
*/
@Test
public
void
testLookupThrottlingForClientByBroker
()
throws
Exception
{
// create configuration znode
ZkUtils
.
createFullPathOptimistic
(
mockZookKeeper
,
BROKER_SERVICE_CONFIGURATION_PATH
,
"{}"
.
getBytes
(),
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode
();
final
String
topicName
=
"persistent://prop/usw/my-ns/newTopic"
;
com
.
yahoo
.
pulsar
.
client
.
api
.
ClientConfiguration
clientConf
=
new
com
.
yahoo
.
pulsar
.
client
.
api
.
ClientConfiguration
();
...
...
@@ -211,12 +192,6 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
@Test
public
void
testLookupThrottlingForClientByBrokerInternalRetry
()
throws
Exception
{
// create configuration znode
ZkUtils
.
createFullPathOptimistic
(
mockZookKeeper
,
BROKER_SERVICE_CONFIGURATION_PATH
,
"{}"
.
getBytes
(),
ZooDefs
.
Ids
.
OPEN_ACL_UNSAFE
,
CreateMode
.
PERSISTENT
);
// Now, znode is created: set the watch and listener on the znode
setWatchOnThrottlingZnode
();
final
String
topicName
=
"persistent://prop/usw/my-ns/newTopic"
;
com
.
yahoo
.
pulsar
.
client
.
api
.
ClientConfiguration
clientConf
=
new
com
.
yahoo
.
pulsar
.
client
.
api
.
ClientConfiguration
();
...
...
@@ -293,11 +268,4 @@ public class BrokerServiceThrottlingTest extends BrokerTestBase {
}
}
private
void
setWatchOnThrottlingZnode
()
throws
Exception
{
Method
updateConfigListenerMethod
=
BrokerService
.
class
.
getDeclaredMethod
(
"updateConfigurationAndRegisterListeners"
);
updateConfigListenerMethod
.
setAccessible
(
true
);
updateConfigListenerMethod
.
invoke
(
pulsar
.
getBrokerService
());
}
}
\ No newline at end of file
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
浏览文件 @
378217d4
...
...
@@ -15,6 +15,7 @@
*/
package
com.yahoo.pulsar.broker.service
;
import
static
com
.
yahoo
.
pulsar
.
broker
.
auth
.
MockedPulsarServiceBaseTest
.
createMockZooKeeper
;
import
static
org
.
mockito
.
Matchers
.
any
;
import
static
org
.
mockito
.
Matchers
.
anyObject
;
import
static
org
.
mockito
.
Matchers
.
matches
;
...
...
@@ -91,7 +92,7 @@ public class PersistentDispatcherFailoverConsumerTest {
mlFactoryMock
=
mock
(
ManagedLedgerFactory
.
class
);
doReturn
(
mlFactoryMock
).
when
(
pulsar
).
getManagedLedgerFactory
();
ZooKeeper
mockZk
=
mock
(
ZooKeeper
.
class
);
ZooKeeper
mockZk
=
createMockZooKeeper
(
);
doReturn
(
mockZk
).
when
(
pulsar
).
getZkClient
();
configCacheService
=
mock
(
ConfigurationCacheService
.
class
);
...
...
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java
浏览文件 @
378217d4
...
...
@@ -15,6 +15,7 @@
*/
package
com.yahoo.pulsar.broker.service
;
import
static
com
.
yahoo
.
pulsar
.
broker
.
auth
.
MockedPulsarServiceBaseTest
.
createMockZooKeeper
;
import
static
org
.
mockito
.
Matchers
.
any
;
import
static
org
.
mockito
.
Mockito
.
doReturn
;
import
static
org
.
mockito
.
Mockito
.
mock
;
...
...
@@ -84,7 +85,7 @@ public class PersistentTopicConcurrentTest extends MockedBookKeeperTestCase {
mlFactoryMock
=
factory
;
doReturn
(
mlFactoryMock
).
when
(
pulsar
).
getManagedLedgerFactory
();
ZooKeeper
mockZk
=
mock
(
ZooKeeper
.
class
);
ZooKeeper
mockZk
=
createMockZooKeeper
(
);
doReturn
(
mockZk
).
when
(
pulsar
).
getZkClient
();
brokerService
=
spy
(
new
BrokerService
(
pulsar
));
...
...
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java
浏览文件 @
378217d4
...
...
@@ -15,6 +15,7 @@
*/
package
com.yahoo.pulsar.broker.service
;
import
static
com
.
yahoo
.
pulsar
.
broker
.
auth
.
MockedPulsarServiceBaseTest
.
createMockZooKeeper
;
import
static
org
.
mockito
.
Matchers
.
any
;
import
static
org
.
mockito
.
Matchers
.
anyObject
;
import
static
org
.
mockito
.
Matchers
.
anyString
;
...
...
@@ -130,7 +131,7 @@ public class PersistentTopicTest {
mlFactoryMock
=
mock
(
ManagedLedgerFactory
.
class
);
doReturn
(
mlFactoryMock
).
when
(
pulsar
).
getManagedLedgerFactory
();
ZooKeeper
mockZk
=
mock
(
ZooKeeper
.
class
);
ZooKeeper
mockZk
=
createMockZooKeeper
(
);
doReturn
(
mockZk
).
when
(
pulsar
).
getZkClient
();
configCacheService
=
mock
(
ConfigurationCacheService
.
class
);
...
...
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java
浏览文件 @
378217d4
...
...
@@ -50,7 +50,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import
org.apache.bookkeeper.mledger.ManagedLedgerException
;
import
org.apache.bookkeeper.mledger.ManagedLedgerFactory
;
import
org.apache.bookkeeper.mledger.impl.PositionImpl
;
import
org.apache.zookeeper.KeeperException.NoNodeException
;
import
org.apache.zookeeper.ZooKeeper
;
import
org.mockito.Mockito
;
import
org.mockito.invocation.InvocationOnMock
;
...
...
@@ -95,6 +94,7 @@ import io.netty.buffer.ByteBuf;
import
io.netty.buffer.Unpooled
;
import
io.netty.channel.embedded.EmbeddedChannel
;
import
io.netty.handler.codec.LengthFieldBasedFrameDecoder
;
import
static
com
.
yahoo
.
pulsar
.
broker
.
auth
.
MockedPulsarServiceBaseTest
.
createMockZooKeeper
;
/**
*/
...
...
@@ -135,7 +135,7 @@ public class ServerCnxTest {
mlFactoryMock
=
mock
(
ManagedLedgerFactory
.
class
);
doReturn
(
mlFactoryMock
).
when
(
pulsar
).
getManagedLedgerFactory
();
ZooKeeper
mockZk
=
mock
(
ZooKeeper
.
class
);
ZooKeeper
mockZk
=
createMockZooKeeper
(
);
doReturn
(
mockZk
).
when
(
pulsar
).
getZkClient
();
configCacheService
=
mock
(
ConfigurationCacheService
.
class
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录