Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
ca5a445f
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
ca5a445f
编写于
5月 13, 2017
作者:
B
Brad McMillen
提交者:
Matteo Merli
5月 13, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add ability to filter brokers based on their build version (#397)
上级
1c12849a
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
513 addition
and
14 deletion
+513
-14
conf/broker.conf
conf/broker.conf
+4
-0
pom.xml
pom.xml
+6
-0
pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
...in/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
+12
-0
pulsar-broker/pom.xml
pulsar-broker/pom.xml
+11
-1
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/LocalBrokerData.java
...rc/main/java/com/yahoo/pulsar/broker/LocalBrokerData.java
+11
-0
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java
.../src/main/java/com/yahoo/pulsar/broker/PulsarService.java
+7
-1
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerFilter.java
...ava/com/yahoo/pulsar/broker/loadbalance/BrokerFilter.java
+6
-2
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerFilterBadVersionException.java
...r/broker/loadbalance/BrokerFilterBadVersionException.java
+30
-0
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerFilterException.java
...ahoo/pulsar/broker/loadbalance/BrokerFilterException.java
+30
-0
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/BrokerVersionFilter.java
...o/pulsar/broker/loadbalance/impl/BrokerVersionFilter.java
+152
-0
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
...ulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+26
-10
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
...pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+2
-0
pulsar-broker/src/main/java/com/yahoo/pulsar/utils/PulsarBrokerVersionStringUtils.java
...om/yahoo/pulsar/utils/PulsarBrokerVersionStringUtils.java
+86
-0
pulsar-broker/src/main/resources/pulsar-broker-version.properties
...roker/src/main/resources/pulsar-broker-version.properties
+1
-0
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/BrokerVersionFilterTest.java
...oo/pulsar/broker/loadbalance/BrokerVersionFilterTest.java
+79
-0
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/AddMissingPatchVersionTest.java
...hoo/pulsar/broker/service/AddMissingPatchVersionTest.java
+41
-0
pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/loadbalancer/LoadReport.java
.../pulsar/common/policies/data/loadbalancer/LoadReport.java
+9
-0
未找到文件。
conf/broker.conf
浏览文件 @
ca5a445f
...
...
@@ -80,6 +80,10 @@ clientLibraryVersionCheckAllowUnversioned=true
# to service discovery health checks
statusFilePath
=
# If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to
*
use
only
brokers
running
the
latest
software
version
(
to
minimize
impact
to
bundles
)
preferLaterVersions
=
false
;
### --- Authentication --- ###
# Enable TLS
...
...
pom.xml
浏览文件 @
ca5a445f
...
...
@@ -405,6 +405,12 @@ flexible messaging model and an intuitive client API.</description>
<version>
${athenz.version}
</version>
</dependency>
<dependency>
<groupId>
com.github.zafarkhaja
</groupId>
<artifactId>
java-semver
</artifactId>
<version>
0.9.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-streaming_2.10
</artifactId>
...
...
pulsar-broker-common/src/main/java/com/yahoo/pulsar/broker/ServiceConfiguration.java
浏览文件 @
ca5a445f
...
...
@@ -264,6 +264,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Name of load manager to use
@FieldContext
(
dynamic
=
true
)
private
String
loadManagerClassName
=
"com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl"
;
// If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to
// use only brokers running the latest software version (to minimize impact to bundles)
@FieldContext
(
dynamic
=
true
)
private
boolean
preferLaterVersions
=
false
;
public
String
getZookeeperServers
()
{
return
zookeeperServers
;
...
...
@@ -979,4 +983,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
public
void
setLoadManagerClassName
(
String
loadManagerClassName
)
{
this
.
loadManagerClassName
=
loadManagerClassName
;
}
public
boolean
isPreferLaterVersions
()
{
return
preferLaterVersions
;
}
public
void
setPreferLaterVersions
(
boolean
preferLaterVersions
)
{
this
.
preferLaterVersions
=
preferLaterVersions
;
}
}
pulsar-broker/pom.xml
浏览文件 @
ca5a445f
...
...
@@ -31,7 +31,6 @@
<name>
Pulsar Broker
</name>
<dependencies>
<dependency>
<groupId>
commons-codec
</groupId>
<artifactId>
commons-codec
</artifactId>
...
...
@@ -198,6 +197,11 @@
<artifactId>
gson
</artifactId>
</dependency>
<dependency>
<groupId>
com.github.zafarkhaja
</groupId>
<artifactId>
java-semver
</artifactId>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-core
</artifactId>
...
...
@@ -266,5 +270,11 @@
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>
src/main/resources
</directory>
<filtering>
true
</filtering>
</resource>
</resources>
</build>
</project>
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/LocalBrokerData.java
浏览文件 @
ca5a445f
...
...
@@ -71,6 +71,9 @@ public class LocalBrokerData extends JSONWritable implements ServiceLookupData {
// The bundles lost since the last invocation of update.
private
Set
<
String
>
lastBundleLosses
;
// The version string that this broker is running, obtained from the Maven build artifact in the POM
private
String
brokerVersionString
;
// For JSON only.
public
LocalBrokerData
()
{
this
(
null
,
null
,
null
,
null
);
...
...
@@ -337,6 +340,14 @@ public class LocalBrokerData extends JSONWritable implements ServiceLookupData {
this
.
msgRateOut
=
msgRateOut
;
}
public
void
setBrokerVersionString
(
String
brokerVersionString
)
{
this
.
brokerVersionString
=
brokerVersionString
;
}
public
String
getBrokerVersionString
()
{
return
brokerVersionString
;
}
@Override
public
String
getWebServiceUrl
()
{
return
webServiceUrl
;
...
...
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java
浏览文件 @
ca5a445f
...
...
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.Condition;
import
java.util.concurrent.locks.ReentrantLock
;
import
java.util.function.Supplier
;
import
com.yahoo.pulsar.utils.PulsarBrokerVersionStringUtils
;
import
org.apache.bookkeeper.mledger.ManagedLedgerFactory
;
import
org.apache.bookkeeper.util.OrderedSafeExecutor
;
import
org.apache.bookkeeper.util.ZkUtils
;
...
...
@@ -111,6 +112,7 @@ public class PulsarService implements AutoCloseable {
private
final
String
webServiceAddressTls
;
private
final
String
brokerServiceUrl
;
private
final
String
brokerServiceUrlTls
;
private
final
String
brokerVersion
;
private
final
MessagingServiceShutdownHook
shutdownService
;
...
...
@@ -133,6 +135,7 @@ public class PulsarService implements AutoCloseable {
this
.
webServiceAddressTls
=
webAddressTls
(
config
);
this
.
brokerServiceUrl
=
brokerUrl
(
config
);
this
.
brokerServiceUrlTls
=
brokerUrlTls
(
config
);
this
.
brokerVersion
=
PulsarBrokerVersionStringUtils
.
getNormalizedVersionString
();
this
.
config
=
config
;
this
.
shutdownService
=
new
MessagingServiceShutdownHook
(
this
);
loadManagerExecutor
=
Executors
.
newSingleThreadScheduledExecutor
();
...
...
@@ -256,7 +259,7 @@ public class PulsarService implements AutoCloseable {
// needs load management service
this
.
startNamespaceService
();
LOG
.
info
(
"Starting Pulsar Broker service
"
);
LOG
.
info
(
"Starting Pulsar Broker service
; version: '{}'"
,
(
brokerVersion
!=
null
?
brokerVersion
:
"unknown"
)
);
brokerService
.
start
();
this
.
webService
=
new
WebService
(
this
);
...
...
@@ -648,4 +651,7 @@ public class PulsarService implements AutoCloseable {
return
loadManager
;
}
public
String
getBrokerVersion
()
{
return
brokerVersion
;
}
}
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerFilter.java
浏览文件 @
ca5a445f
...
...
@@ -18,6 +18,7 @@ package com.yahoo.pulsar.broker.loadbalance;
import
java.util.Set
;
import
com.yahoo.pulsar.broker.BundleData
;
import
com.yahoo.pulsar.broker.PulsarService
;
import
com.yahoo.pulsar.broker.ServiceConfiguration
;
/**
...
...
@@ -40,6 +41,9 @@ public interface BrokerFilter {
* The load data from the leader broker.
* @param conf
* The service configuration.
* @throws BrokerFilterException
* There was an error in the pipeline and the brokers should be reset to their original value
*/
void
filter
(
Set
<
String
>
brokers
,
BundleData
bundleToAssign
,
LoadData
loadData
,
ServiceConfiguration
conf
);
}
public
void
filter
(
Set
<
String
>
brokers
,
BundleData
bundleToAssign
,
LoadData
loadData
,
ServiceConfiguration
conf
)
throws
BrokerFilterException
;
}
\ No newline at end of file
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerFilterBadVersionException.java
0 → 100644
浏览文件 @
ca5a445f
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.yahoo.pulsar.broker.loadbalance
;
public
class
BrokerFilterBadVersionException
extends
BrokerFilterException
{
public
BrokerFilterBadVersionException
(
String
msg
)
{
super
(
msg
);
}
public
BrokerFilterBadVersionException
(
Throwable
t
)
{
super
(
t
);
}
public
BrokerFilterBadVersionException
(
String
msg
,
Throwable
t
)
{
super
(
msg
,
t
);
}
}
\ No newline at end of file
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/BrokerFilterException.java
0 → 100644
浏览文件 @
ca5a445f
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.yahoo.pulsar.broker.loadbalance
;
public
class
BrokerFilterException
extends
Exception
{
public
BrokerFilterException
(
String
msg
)
{
super
(
msg
);
}
public
BrokerFilterException
(
Throwable
t
)
{
super
(
t
);
}
public
BrokerFilterException
(
String
msg
,
Throwable
t
)
{
super
(
msg
,
t
);
}
}
\ No newline at end of file
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/BrokerVersionFilter.java
0 → 100644
浏览文件 @
ca5a445f
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.yahoo.pulsar.broker.loadbalance.impl
;
import
java.util.*
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
com.yahoo.pulsar.broker.PulsarService
;
import
com.yahoo.pulsar.broker.loadbalance.BrokerFilterBadVersionException
;
import
org.apache.bookkeeper.mledger.ManagedLedgerException
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
com.github.zafarkhaja.semver.Version
;
import
com.yahoo.pulsar.broker.BrokerData
;
import
com.yahoo.pulsar.broker.loadbalance.BrokerFilter
;
import
com.yahoo.pulsar.broker.BundleData
;
import
com.yahoo.pulsar.broker.ServiceConfiguration
;
import
com.yahoo.pulsar.broker.loadbalance.LoadData
;
public
class
BrokerVersionFilter
implements
BrokerFilter
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
BrokerVersionFilter
.
class
);
/**
* Get the most recent broker version number from the load reports of all the running brokers. The version
* number is from the build artifact in the pom and got added to the package when it was built by Maven
*
* @param brokers
* The brokers to choose the latest version string from.
* @param loadData
* The load data from the leader broker (contains the load reports which in turn contain the version string).
* @return The most recent broker version
* @throws BrokerFilterBadVersionException
* If the most recent version is undefined (e.g., a bad broker version was encountered or a broker
* does not have a version string in its load report.
*/
public
Version
getLatestVersionNumber
(
Set
<
String
>
brokers
,
LoadData
loadData
)
throws
BrokerFilterBadVersionException
{
if
(
null
==
brokers
)
{
throw
new
BrokerFilterBadVersionException
(
"Unable to determine latest version since broker set was null"
);
}
if
(
brokers
.
size
()
==
0
)
{
throw
new
BrokerFilterBadVersionException
(
"Unable to determine latest version since broker set was empty"
);
}
if
(
null
==
loadData
)
{
throw
new
BrokerFilterBadVersionException
(
"Unable to determine latest version since loadData was null"
);
}
Version
latestVersion
=
null
;
for
(
String
broker
:
brokers
)
{
BrokerData
data
=
loadData
.
getBrokerData
().
get
(
broker
);
if
(
null
==
data
)
{
LOG
.
warn
(
"No broker data for broker [{}]; disabling PreferLaterVersions feature"
,
broker
);
// trigger the ModularLoadManager to reset all the brokers to the original set
throw
new
BrokerFilterBadVersionException
(
"No broker data for broker \""
+
broker
+
"\""
);
}
String
brokerVersion
=
data
.
getLocalData
().
getBrokerVersionString
();
if
(
null
==
brokerVersion
||
brokerVersion
.
length
()
==
0
)
{
LOG
.
warn
(
"No version string in load report for broker [{}]; disabling PreferLaterVersions feature"
,
broker
);
// trigger the ModularLoadManager to reset all the brokers to the original set
throw
new
BrokerFilterBadVersionException
(
"No version string in load report for broker \""
+
broker
+
"\""
);
}
Version
brokerVersionVersion
=
null
;
try
{
brokerVersionVersion
=
Version
.
valueOf
(
brokerVersion
);
}
catch
(
Exception
x
)
{
LOG
.
warn
(
"Invalid version string in load report for broker [{}]: [{}]; disabling PreferLaterVersions feature"
,
broker
,
brokerVersion
);
// trigger the ModularLoadManager to reset all the brokers to the original set
throw
new
BrokerFilterBadVersionException
(
"Invalid version string in load report for broker \""
+
broker
+
"\": \""
+
brokerVersion
+
"\")"
);
}
if
(
null
==
latestVersion
)
{
latestVersion
=
brokerVersionVersion
;
}
else
if
(
Version
.
BUILD_AWARE_ORDER
.
compare
(
latestVersion
,
brokerVersionVersion
)
<
0
)
{
latestVersion
=
brokerVersionVersion
;
}
}
if
(
null
==
latestVersion
)
{
throw
new
BrokerFilterBadVersionException
(
"Unable to determine latest broker version"
);
}
return
latestVersion
;
}
/**
* From the given set of available broker candidates, filter those using the version numbers.
*
* @param brokers
* The currently available brokers that have not already been filtered.
* @param bundleToAssign
* The data for the bundle to assign.
* @param loadData
* The load data from the leader broker.
* @param conf
* The service configuration.
*/
public
void
filter
(
Set
<
String
>
brokers
,
BundleData
bundleToAssign
,
LoadData
loadData
,
ServiceConfiguration
conf
)
throws
BrokerFilterBadVersionException
{
if
(
!
conf
.
isPreferLaterVersions
())
{
return
;
}
com
.
github
.
zafarkhaja
.
semver
.
Version
latestVersion
=
null
;
try
{
latestVersion
=
getLatestVersionNumber
(
brokers
,
loadData
);
LOG
.
info
(
"Latest broker version found was [{}]"
,
latestVersion
);
}
catch
(
Exception
x
)
{
LOG
.
warn
(
"Disabling PreferLaterVersions feature; reason: "
+
x
.
getMessage
());
throw
new
BrokerFilterBadVersionException
(
"Cannot determine newest broker version: "
+
x
.
getMessage
());
}
int
numBrokersLatestVersion
=
0
;
int
numBrokersOlderVersion
=
0
;
Iterator
<
String
>
brokerIterator
=
brokers
.
iterator
();
while
(
brokerIterator
.
hasNext
()
)
{
String
broker
=
brokerIterator
.
next
();
BrokerData
data
=
loadData
.
getBrokerData
().
get
(
broker
);
String
brokerVersion
=
data
.
getLocalData
().
getBrokerVersionString
();
com
.
github
.
zafarkhaja
.
semver
.
Version
brokerVersionVersion
=
Version
.
valueOf
(
brokerVersion
);
if
(
brokerVersionVersion
.
equals
(
latestVersion
)
)
{
LOG
.
debug
(
"Broker [{}] is running the latest version ([{}])"
,
broker
,
brokerVersion
);
++
numBrokersLatestVersion
;
}
else
{
LOG
.
info
(
"Broker [{}] is running an older version ([{}]); latest version is [{}]"
,
broker
,
brokerVersion
,
latestVersion
);
++
numBrokersOlderVersion
;
brokerIterator
.
remove
();
}
}
if
(
numBrokersOlderVersion
==
0
)
{
LOG
.
info
(
"All {} brokers are running the latest version [{}]"
,
numBrokersLatestVersion
,
latestVersion
);
}
}
}
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
浏览文件 @
ca5a445f
...
...
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
import
com.yahoo.pulsar.broker.loadbalance.*
;
import
org.apache.bookkeeper.util.ZkUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.SystemUtils
;
...
...
@@ -49,13 +50,6 @@ import com.yahoo.pulsar.broker.PulsarService;
import
com.yahoo.pulsar.broker.ServiceConfiguration
;
import
com.yahoo.pulsar.broker.TimeAverageBrokerData
;
import
com.yahoo.pulsar.broker.TimeAverageMessageData
;
import
com.yahoo.pulsar.broker.loadbalance.BrokerFilter
;
import
com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage
;
import
com.yahoo.pulsar.broker.loadbalance.LoadData
;
import
com.yahoo.pulsar.broker.loadbalance.LoadManager
;
import
com.yahoo.pulsar.broker.loadbalance.LoadSheddingStrategy
;
import
com.yahoo.pulsar.broker.loadbalance.ModularLoadManager
;
import
com.yahoo.pulsar.broker.loadbalance.ModularLoadManagerStrategy
;
import
com.yahoo.pulsar.common.naming.ServiceUnitId
;
import
com.yahoo.pulsar.common.policies.data.ResourceQuota
;
import
com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats
;
...
...
@@ -219,10 +213,12 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
pulsar
.
getBrokerServiceUrl
(),
pulsar
.
getBrokerServiceUrlTls
());
localData
=
new
LocalBrokerData
(
pulsar
.
getWebServiceAddress
(),
pulsar
.
getWebServiceAddressTls
(),
pulsar
.
getBrokerServiceUrl
(),
pulsar
.
getBrokerServiceUrlTls
());
localData
.
setBrokerVersionString
(
pulsar
.
getBrokerVersion
());
placementStrategy
=
ModularLoadManagerStrategy
.
create
(
conf
);
policies
=
new
SimpleResourceAllocationPolicies
(
pulsar
);
this
.
pulsar
=
pulsar
;
zkClient
=
pulsar
.
getZkClient
();
filterPipeline
.
add
(
new
BrokerVersionFilter
());
}
/**
...
...
@@ -539,10 +535,30 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
log
.
info
(
"{} brokers being considered for assignment of {}"
,
brokerCandidateCache
.
size
(),
bundle
);
// Use the filter pipeline to finalize broker candidates.
for
(
BrokerFilter
filter
:
filterPipeline
)
{
filter
.
filter
(
brokerCandidateCache
,
data
,
loadData
,
conf
);
try
{
for
(
BrokerFilter
filter
:
filterPipeline
)
{
filter
.
filter
(
brokerCandidateCache
,
data
,
loadData
,
conf
);
}
}
catch
(
BrokerFilterException
x
)
{
// restore the list of brokers to the full set
LoadManagerShared
.
applyPolicies
(
serviceUnit
,
policies
,
brokerCandidateCache
,
getAvailableBrokers
());
}
if
(
brokerCandidateCache
.
isEmpty
()
)
{
// restore the list of brokers to the full set
LoadManagerShared
.
applyPolicies
(
serviceUnit
,
policies
,
brokerCandidateCache
,
getAvailableBrokers
());
}
// Choose a broker among the potentially smaller filtered list, when possible
String
broker
=
placementStrategy
.
selectBroker
(
brokerCandidateCache
,
data
,
loadData
,
conf
);
final
double
overloadThreshold
=
conf
.
getLoadBalancerBrokerOverloadedThresholdPercentage
()
/
100.0
;
final
double
maxUsage
=
loadData
.
getBrokerData
().
get
(
broker
).
getLocalData
().
getMaxResourceUsage
();
if
(
maxUsage
>
overloadThreshold
)
{
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared
.
applyPolicies
(
serviceUnit
,
policies
,
brokerCandidateCache
,
getAvailableBrokers
());
broker
=
placementStrategy
.
selectBroker
(
brokerCandidateCache
,
data
,
loadData
,
conf
);
}
final
String
broker
=
placementStrategy
.
selectBroker
(
brokerCandidateCache
,
data
,
loadData
,
conf
);
// Add new bundle to preallocated.
loadData
.
getBrokerData
().
get
(
broker
).
getPreallocatedBundleData
().
put
(
bundle
,
data
);
...
...
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
浏览文件 @
ca5a445f
...
...
@@ -1081,6 +1081,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
pulsar
.
getBrokerServiceUrl
(),
pulsar
.
getBrokerServiceUrlTls
());
loadReport
.
setName
(
String
.
format
(
"%s:%s"
,
pulsar
.
getAdvertisedAddress
(),
pulsar
.
getConfiguration
().
getWebServicePort
()));
loadReport
.
setBrokerVersionString
(
pulsar
.
getBrokerVersion
());
SystemResourceUsage
systemResourceUsage
=
this
.
getSystemResourceUsage
();
loadReport
.
setOverLoaded
(
isAboveLoadLevel
(
systemResourceUsage
,
this
.
getLoadBalancerBrokerOverloadedThresholdPercentage
()));
...
...
pulsar-broker/src/main/java/com/yahoo/pulsar/utils/PulsarBrokerVersionStringUtils.java
0 → 100644
浏览文件 @
ca5a445f
package
com.yahoo.pulsar.utils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.Properties
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
public
class
PulsarBrokerVersionStringUtils
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
PulsarBrokerVersionStringUtils
.
class
);
private
static
final
String
RESOURCE_NAME
=
"pulsar-broker-version.properties"
;
private
static
final
Pattern
majorMinorPatchPattern
=
Pattern
.
compile
(
"([1-9]+[0-9]*)\\.([1-9]+[0-9]*)\\.([1-9]+[0-9]*)(.*)"
);
// If the version string does not contain a patch version, add one so the
// version becomes valid according to the SemVer library (see https://github.com/zafarkhaja/jsemver).
// This method (and it's only call above in the ctor) may be removed when SemVer accepts null patch versions
public
static
String
fixVersionString
(
String
version
)
{
if
(
null
==
version
)
{
return
null
;
}
Matcher
majorMinorPatchMatcher
=
majorMinorPatchPattern
.
matcher
(
version
);
if
(
majorMinorPatchMatcher
.
matches
()
)
{
// this is a valid version, containing a major, a minor, and a patch version (and optionally
// a release candidate version and/or build metadata)
return
version
;
}
else
{
// the patch version is missing, so add one ("0")
Pattern
pattern2
=
Pattern
.
compile
(
"([1-9]+[0-9]*)\\.([1-9]+[0-9]*)(.*)"
);
Matcher
matcher2
=
pattern2
.
matcher
(
version
);
if
(
matcher2
.
matches
())
{
int
startMajorVersion
=
matcher2
.
start
(
1
);
int
stopMinorVersion
=
matcher2
.
end
(
2
);
int
startReleaseCandidate
=
matcher2
.
start
(
3
);
String
prefix
=
new
String
(
version
.
getBytes
(),
startMajorVersion
,
(
stopMinorVersion
-
startMajorVersion
));
String
patchVersion
=
".0"
;
String
suffix
=
new
String
(
version
.
getBytes
(),
startReleaseCandidate
,
version
.
length
()
-
startReleaseCandidate
);
return
(
prefix
+
patchVersion
+
suffix
);
}
else
{
// This is an invalid version, let the JSemVer library fail when it parses it
return
version
;
}
}
}
/**
* Looks for a resource in the jar which is expected to be a java.util.Properties, then
* extract a specific property value.
*
* @return the property value, or null if the resource does not exist or the resource
* is not a valid java.util.Properties or the resource does not contain the
* named property
*/
private
static
String
getPropertyFromResource
(
String
resource
,
String
propertyName
)
{
try
{
InputStream
stream
=
PulsarBrokerVersionStringUtils
.
class
.
getClassLoader
().
getResourceAsStream
(
resource
);
if
(
stream
==
null
)
{
return
null
;
}
Properties
properties
=
new
Properties
();
try
{
properties
.
load
(
stream
);
String
propertyValue
=
(
String
)
properties
.
get
(
propertyName
);
return
propertyValue
;
}
catch
(
IOException
e
)
{
return
null
;
}
finally
{
stream
.
close
();
}
}
catch
(
Throwable
t
)
{
return
null
;
}
}
public
static
String
getNormalizedVersionString
()
{
return
fixVersionString
(
getPropertyFromResource
(
RESOURCE_NAME
,
"version"
));
}
}
pulsar-broker/src/main/resources/pulsar-broker-version.properties
0 → 100644
浏览文件 @
ca5a445f
version
=
${pom.version}
\ No newline at end of file
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/loadbalance/BrokerVersionFilterTest.java
0 → 100644
浏览文件 @
ca5a445f
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.yahoo.pulsar.broker.loadbalance
;
import
java.util.Set
;
import
java.util.TreeSet
;
import
com.yahoo.pulsar.broker.loadbalance.impl.BrokerVersionFilter
;
import
org.testng.Assert
;
import
org.testng.annotations.Test
;
import
com.yahoo.pulsar.broker.BrokerData
;
import
com.yahoo.pulsar.broker.LocalBrokerData
;
import
com.yahoo.pulsar.broker.ServiceConfiguration
;
import
com.github.zafarkhaja.semver.Version
;
public
class
BrokerVersionFilterTest
{
@Test
public
void
testLatestVersion
()
{
LoadData
loadData
=
initLoadData
();
Set
<
String
>
brokers
=
new
TreeSet
<>();
brokers
.
add
(
"broker1"
);
brokers
.
add
(
"broker2"
);
brokers
.
add
(
"broker3"
);
BrokerVersionFilter
filter
=
new
BrokerVersionFilter
();
try
{
Version
latestVersion
=
filter
.
getLatestVersionNumber
(
brokers
,
loadData
);
Assert
.
assertEquals
(
latestVersion
.
getMajorVersion
(),
1
);
Assert
.
assertEquals
(
latestVersion
.
getMinorVersion
(),
2
);
Assert
.
assertEquals
(
latestVersion
.
getPatchVersion
(),
0
);
}
catch
(
BrokerFilterBadVersionException
bad
)
{
Assert
.
fail
(
bad
.
getMessage
(),
bad
);
}
ServiceConfiguration
configuration
=
new
ServiceConfiguration
();
configuration
.
setPreferLaterVersions
(
true
);
try
{
filter
.
filter
(
brokers
,
null
,
loadData
,
configuration
);
// Only one broker is running the latest version
Assert
.
assertEquals
(
brokers
.
size
(),
1
);
}
catch
(
BrokerFilterBadVersionException
bad
)
{
Assert
.
fail
(
bad
.
getMessage
(),
bad
);
}
}
private
LoadData
initLoadData
()
{
LocalBrokerData
broker1Data
=
new
LocalBrokerData
();
broker1Data
.
setBrokerVersionString
(
"1.1.0-SNAPSHOT"
);
LocalBrokerData
broker2Data
=
new
LocalBrokerData
();
broker2Data
.
setBrokerVersionString
(
"1.1.0-SNAPSHOT"
);
LocalBrokerData
broker3Data
=
new
LocalBrokerData
();
broker3Data
.
setBrokerVersionString
(
"1.2.0-SNAPSHOT"
);
LoadData
loadData
=
new
LoadData
();
loadData
.
getBrokerData
().
put
(
"broker1"
,
new
BrokerData
(
broker1Data
));
loadData
.
getBrokerData
().
put
(
"broker2"
,
new
BrokerData
(
broker2Data
));
loadData
.
getBrokerData
().
put
(
"broker3"
,
new
BrokerData
(
broker3Data
));
return
loadData
;
}
}
\ No newline at end of file
pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/AddMissingPatchVersionTest.java
0 → 100644
浏览文件 @
ca5a445f
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.yahoo.pulsar.broker.service
;
import
com.yahoo.pulsar.broker.PulsarService
;
import
com.yahoo.pulsar.utils.PulsarBrokerVersionStringUtils
;
import
org.testng.Assert
;
import
org.testng.annotations.Test
;
public
class
AddMissingPatchVersionTest
{
@Test
public
void
testVersionStrings
()
throws
Exception
{
// Fixable versions (those lacking a patch release) get normalized with a patch release of 0
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2"
),
"1.2.0"
);
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2-SNAPSHOT"
),
"1.2.0-SNAPSHOT"
);
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2-SNAPSHOT+BUILD"
),
"1.2.0-SNAPSHOT+BUILD"
);
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2+BUILD"
),
"1.2.0+BUILD"
);
// Already valid versions get returned unchanged
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2.3"
),
"1.2.3"
);
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2.3-SNAPSHOT"
),
"1.2.3-SNAPSHOT"
);
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2.3-SNAPSHOT+BUILD"
),
"1.2.3-SNAPSHOT+BUILD"
);
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1.2.3+BUILD"
),
"1.2.3+BUILD"
);
// Non-fixable versions get returned as-is
Assert
.
assertEquals
(
PulsarBrokerVersionStringUtils
.
fixVersionString
(
"1"
),
"1"
);
}
}
\ No newline at end of file
pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/loadbalancer/LoadReport.java
浏览文件 @
ca5a445f
...
...
@@ -33,6 +33,7 @@ import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage.Re
*/
public
class
LoadReport
implements
ServiceLookupData
{
private
String
name
;
private
String
brokerVersionString
;
private
final
String
webServiceUrl
;
private
final
String
webServiceUrlTls
;
...
...
@@ -356,6 +357,14 @@ public class LoadReport implements ServiceLookupData {
this
.
preAllocatedMsgRateOut
=
preAllocatedMsgRateOut
;
}
public
void
setBrokerVersionString
(
String
brokerVersionString
)
{
this
.
brokerVersionString
=
brokerVersionString
;
}
public
String
getBrokerVersionString
()
{
return
brokerVersionString
;
}
@Override
public
String
getWebServiceUrl
()
{
return
webServiceUrl
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录