Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
362be2a9
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
362be2a9
编写于
6月 28, 2019
作者:
B
Boyang Jerry Peng
提交者:
GitHub
6月 28, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix get partitioned topics for non-persistent topics (#4613)
上级
5a10c4ac
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
41 addition
and
2 deletion
+41
-2
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
...in/java/org/apache/pulsar/broker/admin/AdminResource.java
+1
-1
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
.../org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+40
-1
未找到文件。
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
浏览文件 @
362be2a9
...
...
@@ -583,7 +583,7 @@ public abstract class AdminResource extends PulsarWebResource {
String
partitionedTopicPath
=
path
(
PARTITIONED_TOPIC_PATH_ZNODE
,
namespaceName
.
toString
(),
topicDomain
.
value
());
List
<
String
>
topics
=
globalZk
().
getChildren
(
partitionedTopicPath
,
false
);
partitionedTopics
=
topics
.
stream
()
.
map
(
s
->
String
.
format
(
"
persistent://%s/%s"
,
namespaceName
.
toString
(),
decode
(
s
)))
.
map
(
s
->
String
.
format
(
"
%s://%s/%s"
,
topicDomain
.
value
()
,
namespaceName
.
toString
(),
decode
(
s
)))
.
collect
(
Collectors
.
toList
());
}
catch
(
KeeperException
.
NoNodeException
e
)
{
// NoNode means there are no partitioned topics in this domain for this namespace
...
...
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
浏览文件 @
362be2a9
...
...
@@ -19,16 +19,21 @@
package
org.apache.pulsar.broker.admin
;
import
com.google.common.collect.Sets
;
import
org.apache.pulsar.broker.admin.v2.NonPersistentTopics
;
import
org.apache.pulsar.broker.admin.v2.PersistentTopics
;
import
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
;
import
org.apache.pulsar.broker.authentication.AuthenticationDataHttps
;
import
org.apache.pulsar.broker.web.PulsarWebResource
;
import
org.apache.pulsar.broker.web.RestException
;
import
org.apache.pulsar.client.admin.PulsarAdminException
;
import
org.apache.pulsar.client.api.MessageId
;
import
org.apache.pulsar.client.impl.MessageIdImpl
;
import
org.apache.pulsar.common.naming.TopicDomain
;
import
org.apache.pulsar.common.naming.TopicName
;
import
org.apache.pulsar.common.partition.PartitionedTopicMetadata
;
import
org.apache.pulsar.common.policies.data.ClusterData
;
import
org.apache.pulsar.common.policies.data.TenantInfo
;
import
org.apache.zookeeper.KeeperException
;
import
org.testng.Assert
;
import
org.testng.annotations.AfterMethod
;
import
org.testng.annotations.BeforeClass
;
...
...
@@ -53,6 +58,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
private
final
String
testNamespace
=
"my-namespace"
;
protected
Field
uriField
;
protected
UriInfo
uriInfo
;
private
NonPersistentTopics
nonPersistentTopic
;
@BeforeClass
public
void
initPersistentTopics
()
throws
Exception
{
...
...
@@ -75,9 +81,25 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
doReturn
(
false
).
when
(
persistentTopics
).
isRequestHttps
();
doReturn
(
null
).
when
(
persistentTopics
).
originalPrincipal
();
doReturn
(
"test"
).
when
(
persistentTopics
).
clientAppId
();
doReturn
(
"persistent"
).
when
(
persistentTopics
).
domain
();
doReturn
(
TopicDomain
.
persistent
.
value
()
).
when
(
persistentTopics
).
domain
();
doNothing
().
when
(
persistentTopics
).
validateAdminAccessForTenant
(
this
.
testTenant
);
doReturn
(
mock
(
AuthenticationDataHttps
.
class
)).
when
(
persistentTopics
).
clientAuthData
();
nonPersistentTopic
=
spy
(
new
NonPersistentTopics
());
nonPersistentTopic
.
setServletContext
(
new
MockServletContext
());
nonPersistentTopic
.
setPulsar
(
pulsar
);
doReturn
(
mockZookKeeper
).
when
(
nonPersistentTopic
).
globalZk
();
doReturn
(
mockZookKeeper
).
when
(
nonPersistentTopic
).
localZk
();
doReturn
(
pulsar
.
getConfigurationCache
().
propertiesCache
()).
when
(
nonPersistentTopic
).
tenantsCache
();
doReturn
(
pulsar
.
getConfigurationCache
().
policiesCache
()).
when
(
nonPersistentTopic
).
policiesCache
();
doReturn
(
false
).
when
(
nonPersistentTopic
).
isRequestHttps
();
doReturn
(
null
).
when
(
nonPersistentTopic
).
originalPrincipal
();
doReturn
(
"test"
).
when
(
nonPersistentTopic
).
clientAppId
();
doReturn
(
TopicDomain
.
non_persistent
.
value
()).
when
(
nonPersistentTopic
).
domain
();
doNothing
().
when
(
nonPersistentTopic
).
validateAdminAccessForTenant
(
this
.
testTenant
);
doReturn
(
mock
(
AuthenticationDataHttps
.
class
)).
when
(
nonPersistentTopic
).
clientAuthData
();
admin
.
clusters
().
createCluster
(
"use"
,
new
ClusterData
(
"http://broker-use.com:"
+
BROKER_WEBSERVICE_PORT
));
admin
.
clusters
().
createCluster
(
"test"
,
new
ClusterData
(
"http://broker-use.com:"
+
BROKER_WEBSERVICE_PORT
));
admin
.
tenants
().
createTenant
(
this
.
testTenant
,
...
...
@@ -159,4 +181,21 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
throw
e
;
}
}
@Test
public
void
testGetPartitionedTopicsList
()
throws
KeeperException
,
InterruptedException
,
PulsarAdminException
{
persistentTopics
.
createPartitionedTopic
(
testTenant
,
testNamespace
,
"test-topic1"
,
3
);
nonPersistentTopic
.
createPartitionedTopic
(
testTenant
,
testNamespace
,
"test-topic2"
,
3
);
List
<
String
>
persistentPartitionedTopics
=
persistentTopics
.
getPartitionedTopicList
(
testTenant
,
testNamespace
);
Assert
.
assertEquals
(
persistentPartitionedTopics
.
size
(),
1
);
Assert
.
assertEquals
(
TopicName
.
get
(
persistentPartitionedTopics
.
get
(
0
)).
getDomain
().
value
(),
TopicDomain
.
persistent
.
value
());
List
<
String
>
nonPersistentPartitionedTopics
=
nonPersistentTopic
.
getPartitionedTopicList
(
testTenant
,
testNamespace
);
Assert
.
assertEquals
(
nonPersistentPartitionedTopics
.
size
(),
1
);
Assert
.
assertEquals
(
TopicName
.
get
(
nonPersistentPartitionedTopics
.
get
(
0
)).
getDomain
().
value
(),
TopicDomain
.
non_persistent
.
value
());
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录