Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
superrain51
apollo
提交
4949eb55
apollo
项目概览
superrain51
/
apollo
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
apollo
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4949eb55
编写于
1月 16, 2017
作者:
张
张乐
提交者:
GitHub
1月 16, 2017
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #518 from nobodyiam/optimize-notification-controller-merge
Use cache for notification controller
上级
efcfe940
1eaf3791
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
960 addition
and
24 deletion
+960
-24
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/config/BizConfig.java
...java/com/ctrip/framework/apollo/biz/config/BizConfig.java
+24
-0
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/grayReleaseRule/GrayReleaseRulesHolder.java
...rk/apollo/biz/grayReleaseRule/GrayReleaseRulesHolder.java
+1
-7
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/repository/AppNamespaceRepository.java
...amework/apollo/biz/repository/AppNamespaceRepository.java
+2
-0
apollo-common/src/main/java/com/ctrip/framework/apollo/common/config/RefreshableConfig.java
...rip/framework/apollo/common/config/RefreshableConfig.java
+4
-4
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/ConfigServiceAutoConfiguration.java
.../apollo/configservice/ConfigServiceAutoConfiguration.java
+5
-0
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationController.java
...ollo/configservice/controller/NotificationController.java
+2
-2
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java
...lo/configservice/controller/NotificationControllerV2.java
+2
-2
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/service/AppNamespaceServiceWithCache.java
...o/configservice/service/AppNamespaceServiceWithCache.java
+252
-0
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/service/ReleaseMessageServiceWithCache.java
...configservice/service/ReleaseMessageServiceWithCache.java
+186
-0
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/util/WatchKeysUtil.java
...ip/framework/apollo/configservice/util/WatchKeysUtil.java
+2
-2
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/AllTests.java
...va/com/ctrip/framework/apollo/configservice/AllTests.java
+4
-1
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerTest.java
.../configservice/controller/NotificationControllerTest.java
+2
-2
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java
...onfigservice/controller/NotificationControllerV2Test.java
+2
-2
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/integration/NotificationControllerIntegrationTest.java
...ce/integration/NotificationControllerIntegrationTest.java
+9
-0
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/integration/NotificationControllerV2IntegrationTest.java
.../integration/NotificationControllerV2IntegrationTest.java
+7
-0
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/service/AppNamespaceServiceWithCacheTest.java
...nfigservice/service/AppNamespaceServiceWithCacheTest.java
+211
-0
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/service/ReleaseMessageServiceWithCacheTest.java
...igservice/service/ReleaseMessageServiceWithCacheTest.java
+243
-0
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/util/WatchKeysUtilTest.java
...ramework/apollo/configservice/util/WatchKeysUtilTest.java
+2
-2
未找到文件。
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/config/BizConfig.java
浏览文件 @
4949eb55
...
...
@@ -16,6 +16,7 @@ import java.lang.reflect.Type;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.TimeUnit
;
@Component
public
class
BizConfig
extends
RefreshableConfig
{
...
...
@@ -80,5 +81,28 @@ public class BizConfig extends RefreshableConfig {
return
getValue
(
"clogging.server.port"
);
}
public
int
appNamespaceCacheScanInterval
()
{
return
getIntProperty
(
"apollo.app-namespace-cache-scan.interval"
,
1
);
}
public
TimeUnit
appNamespaceCacheScanIntervalTimeUnit
()
{
return
TimeUnit
.
SECONDS
;
}
public
int
appNamespaceCacheRebuildInterval
()
{
return
getIntProperty
(
"apollo.app-namespace-cache-rebuild.interval"
,
60
);
}
public
TimeUnit
appNamespaceCacheRebuildIntervalTimeUnit
()
{
return
TimeUnit
.
SECONDS
;
}
public
int
releaseMessageCacheScanInterval
()
{
return
getIntProperty
(
"apollo.release-message-cache-scan.interval"
,
1
);
}
public
TimeUnit
releaseMessageCacheScanIntervalTimeUnit
()
{
return
TimeUnit
.
SECONDS
;
}
}
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/grayReleaseRule/GrayReleaseRulesHolder.java
浏览文件 @
4949eb55
...
...
@@ -30,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.util.CollectionUtils
;
import
java.util.List
;
import
java.util.Objects
;
import
java.util.Set
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
...
...
@@ -250,12 +249,7 @@ public class GrayReleaseRulesHolder implements ReleaseMessageListener, Initializ
}
private
void
populateDataBaseInterval
()
{
try
{
databaseScanInterval
=
bizConfig
.
grayReleaseRuleScanInterval
();
}
catch
(
Throwable
ex
)
{
Tracer
.
logError
(
ex
);
logger
.
error
(
"Load apollo gray release rule scan interval from server config failed"
,
ex
);
}
databaseScanInterval
=
bizConfig
.
grayReleaseRuleScanInterval
();
}
private
int
getDatabaseScanIntervalSecond
()
{
...
...
apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/repository/AppNamespaceRepository.java
浏览文件 @
4949eb55
...
...
@@ -20,4 +20,6 @@ public interface AppNamespaceRepository extends PagingAndSortingRepository<AppNa
List
<
AppNamespace
>
findByAppIdAndIsPublic
(
String
appId
,
boolean
isPublic
);
List
<
AppNamespace
>
findFirst500ByIdGreaterThanOrderByIdAsc
(
long
id
);
}
apollo-common/src/main/java/com/ctrip/framework/apollo/common/config/RefreshableConfig.java
浏览文件 @
4949eb55
...
...
@@ -75,7 +75,7 @@ public abstract class RefreshableConfig {
try
{
String
value
=
getValue
(
key
);
return
value
==
null
?
defaultValue
:
Integer
.
parseInt
(
value
);
}
catch
(
Exception
e
)
{
}
catch
(
Throwable
e
)
{
Tracer
.
logError
(
"Get int property failed."
,
e
);
return
defaultValue
;
}
...
...
@@ -85,7 +85,7 @@ public abstract class RefreshableConfig {
try
{
String
value
=
getValue
(
key
);
return
value
==
null
?
defaultValue
:
"true"
.
equals
(
value
);
}
catch
(
Exception
e
)
{
}
catch
(
Throwable
e
)
{
Tracer
.
logError
(
"Get boolean property failed."
,
e
);
return
defaultValue
;
}
...
...
@@ -95,7 +95,7 @@ public abstract class RefreshableConfig {
try
{
String
value
=
getValue
(
key
);
return
Strings
.
isNullOrEmpty
(
value
)
?
defaultValue
:
value
.
split
(
LIST_SEPARATOR
);
}
catch
(
Exception
e
)
{
}
catch
(
Throwable
e
)
{
Tracer
.
logError
(
"Get array property failed."
,
e
);
return
defaultValue
;
}
...
...
@@ -104,7 +104,7 @@ public abstract class RefreshableConfig {
public
String
getValue
(
String
key
,
String
defaultValue
)
{
try
{
return
environment
.
getProperty
(
key
,
defaultValue
);
}
catch
(
Exception
e
)
{
}
catch
(
Throwable
e
)
{
Tracer
.
logError
(
"Get value failed."
,
e
);
return
defaultValue
;
}
...
...
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/ConfigServiceAutoConfiguration.java
浏览文件 @
4949eb55
...
...
@@ -5,6 +5,7 @@ import com.ctrip.framework.apollo.biz.message.ReleaseMessageScanner;
import
com.ctrip.framework.apollo.configservice.controller.ConfigFileController
;
import
com.ctrip.framework.apollo.configservice.controller.NotificationController
;
import
com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
...
...
@@ -30,10 +31,14 @@ public class ConfigServiceAutoConfiguration {
private
NotificationControllerV2
notificationControllerV2
;
@Autowired
private
GrayReleaseRulesHolder
grayReleaseRulesHolder
;
@Autowired
private
ReleaseMessageServiceWithCache
releaseMessageServiceWithCache
;
@Bean
public
ReleaseMessageScanner
releaseMessageScanner
()
{
ReleaseMessageScanner
releaseMessageScanner
=
new
ReleaseMessageScanner
();
//0. handle release message cache
releaseMessageScanner
.
addMessageListener
(
releaseMessageServiceWithCache
);
//1. handle gray release rule
releaseMessageScanner
.
addMessageListener
(
grayReleaseRulesHolder
);
//2. handle server cache
...
...
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationController.java
浏览文件 @
4949eb55
...
...
@@ -10,8 +10,8 @@ import com.google.common.collect.Multimaps;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.ReleaseMessageListener
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.service.ReleaseMessageService
;
import
com.ctrip.framework.apollo.biz.utils.EntityManagerUtil
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
;
import
com.ctrip.framework.apollo.configservice.util.NamespaceUtil
;
import
com.ctrip.framework.apollo.configservice.util.WatchKeysUtil
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
...
...
@@ -51,7 +51,7 @@ public class NotificationController implements ReleaseMessageListener {
private
WatchKeysUtil
watchKeysUtil
;
@Autowired
private
ReleaseMessageService
releaseMessageService
;
private
ReleaseMessageService
WithCache
releaseMessageService
;
@Autowired
private
EntityManagerUtil
entityManagerUtil
;
...
...
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java
浏览文件 @
4949eb55
...
...
@@ -15,9 +15,9 @@ import com.google.gson.reflect.TypeToken;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.ReleaseMessageListener
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.service.ReleaseMessageService
;
import
com.ctrip.framework.apollo.biz.utils.EntityManagerUtil
;
import
com.ctrip.framework.apollo.common.exception.BadRequestException
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
;
import
com.ctrip.framework.apollo.configservice.util.NamespaceUtil
;
import
com.ctrip.framework.apollo.configservice.util.WatchKeysUtil
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
...
...
@@ -65,7 +65,7 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
private
WatchKeysUtil
watchKeysUtil
;
@Autowired
private
ReleaseMessageService
releaseMessageService
;
private
ReleaseMessageService
WithCache
releaseMessageService
;
@Autowired
private
EntityManagerUtil
entityManagerUtil
;
...
...
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/service/AppNamespaceServiceWithCache.java
0 → 100644
浏览文件 @
4949eb55
package
com.ctrip.framework.apollo.configservice.service
;
import
com.google.common.base.Joiner
;
import
com.google.common.base.Preconditions
;
import
com.google.common.base.Strings
;
import
com.google.common.collect.Lists
;
import
com.google.common.collect.Maps
;
import
com.google.common.collect.Sets
;
import
com.ctrip.framework.apollo.biz.config.BizConfig
;
import
com.ctrip.framework.apollo.biz.repository.AppNamespaceRepository
;
import
com.ctrip.framework.apollo.common.entity.AppNamespace
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
import
com.ctrip.framework.apollo.core.utils.ApolloThreadFactory
;
import
com.ctrip.framework.apollo.tracer.Tracer
;
import
com.ctrip.framework.apollo.tracer.spi.Transaction
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.InitializingBean
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.util.CollectionUtils
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public
class
AppNamespaceServiceWithCache
implements
InitializingBean
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
AppNamespaceServiceWithCache
.
class
);
private
static
final
Joiner
STRING_JOINER
=
Joiner
.
on
(
ConfigConsts
.
CLUSTER_NAMESPACE_SEPARATOR
)
.
skipNulls
();
@Autowired
private
AppNamespaceRepository
appNamespaceRepository
;
@Autowired
private
BizConfig
bizConfig
;
private
int
scanInterval
;
private
TimeUnit
scanIntervalTimeUnit
;
private
int
rebuildInterval
;
private
TimeUnit
rebuildIntervalTimeUnit
;
private
ScheduledExecutorService
scheduledExecutorService
;
private
long
maxIdScanned
;
//store namespaceName -> AppNamespace
private
Map
<
String
,
AppNamespace
>
publicAppNamespaceCache
;
//store appId+namespaceName -> AppNamespace
private
Map
<
String
,
AppNamespace
>
appNamespaceCache
;
//store id -> AppNamespace
private
Map
<
Long
,
AppNamespace
>
appNamespaceIdCache
;
public
AppNamespaceServiceWithCache
()
{
maxIdScanned
=
0
;
publicAppNamespaceCache
=
Maps
.
newConcurrentMap
();
appNamespaceCache
=
Maps
.
newConcurrentMap
();
appNamespaceIdCache
=
Maps
.
newConcurrentMap
();
scheduledExecutorService
=
Executors
.
newScheduledThreadPool
(
1
,
ApolloThreadFactory
.
create
(
"AppNamespaceServiceWithCache"
,
true
));
}
public
List
<
AppNamespace
>
findByAppIdAndNamespaces
(
String
appId
,
Set
<
String
>
namespaceNames
)
{
Preconditions
.
checkArgument
(!
Strings
.
isNullOrEmpty
(
appId
),
"appId must not be null"
);
if
(
namespaceNames
==
null
||
namespaceNames
.
isEmpty
())
{
return
Collections
.
emptyList
();
}
// return appNamespaceRepository.findByAppIdAndNameIn(appId, namespaceNames);
List
<
AppNamespace
>
result
=
Lists
.
newArrayList
();
for
(
String
namespaceName
:
namespaceNames
)
{
AppNamespace
appNamespace
=
appNamespaceCache
.
get
(
STRING_JOINER
.
join
(
appId
,
namespaceName
));
if
(
appNamespace
!=
null
)
{
result
.
add
(
appNamespace
);
}
}
return
result
;
}
public
List
<
AppNamespace
>
findPublicNamespacesByNames
(
Set
<
String
>
namespaceNames
)
{
if
(
namespaceNames
==
null
||
namespaceNames
.
isEmpty
())
{
return
Collections
.
emptyList
();
}
// return appNamespaceRepository.findByNameInAndIsPublicTrue(namespaceNames);
List
<
AppNamespace
>
result
=
Lists
.
newArrayList
();
for
(
String
namespaceName
:
namespaceNames
)
{
AppNamespace
appNamespace
=
publicAppNamespaceCache
.
get
(
namespaceName
);
if
(
appNamespace
!=
null
)
{
result
.
add
(
appNamespace
);
}
}
return
result
;
}
@Override
public
void
afterPropertiesSet
()
throws
Exception
{
populateDataBaseInterval
();
scanNewAppNamespaces
();
//block the startup process until load finished
scheduledExecutorService
.
scheduleAtFixedRate
(()
->
{
Transaction
transaction
=
Tracer
.
newTransaction
(
"Apollo.AppNamespaceServiceWithCache"
,
"rebuildCache"
);
try
{
this
.
updateAndDeleteCache
();
transaction
.
setStatus
(
Transaction
.
SUCCESS
);
}
catch
(
Throwable
ex
)
{
transaction
.
setStatus
(
ex
);
logger
.
error
(
"Rebuild cache failed"
,
ex
);
}
finally
{
transaction
.
complete
();
}
},
rebuildInterval
,
rebuildInterval
,
rebuildIntervalTimeUnit
);
scheduledExecutorService
.
scheduleWithFixedDelay
(
this
::
scanNewAppNamespaces
,
scanInterval
,
scanInterval
,
scanIntervalTimeUnit
);
}
private
void
scanNewAppNamespaces
()
{
Transaction
transaction
=
Tracer
.
newTransaction
(
"Apollo.AppNamespaceServiceWithCache"
,
"scanNewAppNamespaces"
);
try
{
this
.
loadNewAppNamespaces
();
transaction
.
setStatus
(
Transaction
.
SUCCESS
);
}
catch
(
Throwable
ex
)
{
transaction
.
setStatus
(
ex
);
logger
.
error
(
"Load new app namespaces failed"
,
ex
);
}
finally
{
transaction
.
complete
();
}
}
//for those new app namespaces
private
void
loadNewAppNamespaces
()
{
boolean
hasMore
=
true
;
while
(
hasMore
&&
!
Thread
.
currentThread
().
isInterrupted
())
{
//current batch is 500
List
<
AppNamespace
>
appNamespaces
=
appNamespaceRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
maxIdScanned
);
if
(
CollectionUtils
.
isEmpty
(
appNamespaces
))
{
break
;
}
mergeAppNamespaces
(
appNamespaces
);
int
scanned
=
appNamespaces
.
size
();
maxIdScanned
=
appNamespaces
.
get
(
scanned
-
1
).
getId
();
hasMore
=
scanned
==
500
;
logger
.
info
(
"Loaded {} new app namespaces with startId {}"
,
scanned
,
maxIdScanned
);
}
}
private
void
mergeAppNamespaces
(
List
<
AppNamespace
>
appNamespaces
)
{
for
(
AppNamespace
appNamespace
:
appNamespaces
)
{
appNamespaceCache
.
put
(
assembleAppNamespaceKey
(
appNamespace
),
appNamespace
);
appNamespaceIdCache
.
put
(
appNamespace
.
getId
(),
appNamespace
);
if
(
appNamespace
.
isPublic
())
{
publicAppNamespaceCache
.
put
(
appNamespace
.
getName
(),
appNamespace
);
}
}
}
//for those updated or deleted app namespaces
private
void
updateAndDeleteCache
()
{
List
<
Long
>
ids
=
Lists
.
newArrayList
(
appNamespaceIdCache
.
keySet
());
if
(
CollectionUtils
.
isEmpty
(
ids
))
{
return
;
}
List
<
List
<
Long
>>
partitionIds
=
Lists
.
partition
(
ids
,
500
);
for
(
List
<
Long
>
toRebuild
:
partitionIds
)
{
Iterable
<
AppNamespace
>
appNamespaces
=
appNamespaceRepository
.
findAll
(
toRebuild
);
if
(
appNamespaces
==
null
)
{
continue
;
}
//handle updated
Set
<
Long
>
foundIds
=
handleUpdatedAppNamespaces
(
appNamespaces
);
//handle deleted
handleDeletedAppNamespaces
(
Sets
.
difference
(
Sets
.
newHashSet
(
toRebuild
),
foundIds
));
}
}
//for those updated app namespaces
private
Set
<
Long
>
handleUpdatedAppNamespaces
(
Iterable
<
AppNamespace
>
appNamespaces
)
{
Set
<
Long
>
foundIds
=
Sets
.
newHashSet
();
for
(
AppNamespace
appNamespace
:
appNamespaces
)
{
foundIds
.
add
(
appNamespace
.
getId
());
AppNamespace
thatInCache
=
appNamespaceIdCache
.
get
(
appNamespace
.
getId
());
if
(
thatInCache
!=
null
&&
appNamespace
.
getDataChangeLastModifiedTime
().
after
(
thatInCache
.
getDataChangeLastModifiedTime
()))
{
appNamespaceIdCache
.
put
(
appNamespace
.
getId
(),
appNamespace
);
String
oldKey
=
assembleAppNamespaceKey
(
thatInCache
);
String
newKey
=
assembleAppNamespaceKey
(
appNamespace
);
appNamespaceCache
.
put
(
newKey
,
appNamespace
);
//in case appId or namespaceName changes
if
(!
newKey
.
equals
(
oldKey
))
{
appNamespaceCache
.
remove
(
oldKey
);
}
if
(
appNamespace
.
isPublic
())
{
publicAppNamespaceCache
.
put
(
appNamespace
.
getName
(),
appNamespace
);
//in case namespaceName changes
if
(!
appNamespace
.
getName
().
equals
(
thatInCache
.
getName
())
&&
thatInCache
.
isPublic
())
{
publicAppNamespaceCache
.
remove
(
thatInCache
.
getName
());
}
}
else
if
(
thatInCache
.
isPublic
())
{
//just in case isPublic changes
publicAppNamespaceCache
.
remove
(
thatInCache
.
getName
());
}
logger
.
info
(
"Found AppNamespace changes, old: {}, new: {}"
,
thatInCache
,
appNamespace
);
}
}
return
foundIds
;
}
//for those deleted app namespaces
private
void
handleDeletedAppNamespaces
(
Set
<
Long
>
deletedIds
)
{
if
(
CollectionUtils
.
isEmpty
(
deletedIds
))
{
return
;
}
for
(
Long
deletedId
:
deletedIds
)
{
AppNamespace
deleted
=
appNamespaceIdCache
.
remove
(
deletedId
);
if
(
deleted
==
null
)
{
continue
;
}
appNamespaceCache
.
remove
(
assembleAppNamespaceKey
(
deleted
));
if
(
deleted
.
isPublic
())
{
publicAppNamespaceCache
.
remove
(
deleted
.
getName
());
}
logger
.
info
(
"Found AppNamespace deleted, {}"
,
deleted
);
}
}
private
String
assembleAppNamespaceKey
(
AppNamespace
appNamespace
)
{
return
STRING_JOINER
.
join
(
appNamespace
.
getAppId
(),
appNamespace
.
getName
());
}
private
void
populateDataBaseInterval
()
{
scanInterval
=
bizConfig
.
appNamespaceCacheScanInterval
();
scanIntervalTimeUnit
=
bizConfig
.
appNamespaceCacheScanIntervalTimeUnit
();
rebuildInterval
=
bizConfig
.
appNamespaceCacheRebuildInterval
();
rebuildIntervalTimeUnit
=
bizConfig
.
appNamespaceCacheRebuildIntervalTimeUnit
();
}
}
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/service/ReleaseMessageServiceWithCache.java
0 → 100644
浏览文件 @
4949eb55
package
com.ctrip.framework.apollo.configservice.service
;
import
com.google.common.base.Strings
;
import
com.google.common.collect.Lists
;
import
com.google.common.collect.Maps
;
import
com.ctrip.framework.apollo.biz.config.BizConfig
;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.ReleaseMessageListener
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository
;
import
com.ctrip.framework.apollo.core.utils.ApolloThreadFactory
;
import
com.ctrip.framework.apollo.tracer.Tracer
;
import
com.ctrip.framework.apollo.tracer.spi.Transaction
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.InitializingBean
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.util.CollectionUtils
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicBoolean
;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public
class
ReleaseMessageServiceWithCache
implements
ReleaseMessageListener
,
InitializingBean
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ReleaseMessageServiceWithCache
.
class
);
@Autowired
private
ReleaseMessageRepository
releaseMessageRepository
;
@Autowired
private
BizConfig
bizConfig
;
private
int
scanInterval
;
private
TimeUnit
scanIntervalTimeUnit
;
private
volatile
long
maxIdScanned
;
private
ConcurrentMap
<
String
,
ReleaseMessage
>
releaseMessageCache
;
private
AtomicBoolean
doScan
;
private
ExecutorService
executorService
;
public
ReleaseMessageServiceWithCache
()
{
initialize
();
}
private
void
initialize
()
{
releaseMessageCache
=
Maps
.
newConcurrentMap
();
doScan
=
new
AtomicBoolean
(
true
);
executorService
=
Executors
.
newSingleThreadExecutor
(
ApolloThreadFactory
.
create
(
"ReleaseMessageServiceWithCache"
,
true
));
}
public
ReleaseMessage
findLatestReleaseMessageForMessages
(
Set
<
String
>
messages
)
{
if
(
CollectionUtils
.
isEmpty
(
messages
))
{
return
null
;
}
long
maxReleaseMessageId
=
0
;
ReleaseMessage
result
=
null
;
for
(
String
message
:
messages
)
{
ReleaseMessage
releaseMessage
=
releaseMessageCache
.
get
(
message
);
if
(
releaseMessage
!=
null
&&
releaseMessage
.
getId
()
>
maxReleaseMessageId
)
{
maxReleaseMessageId
=
releaseMessage
.
getId
();
result
=
releaseMessage
;
}
}
return
result
;
}
public
List
<
ReleaseMessage
>
findLatestReleaseMessagesGroupByMessages
(
Set
<
String
>
messages
)
{
if
(
CollectionUtils
.
isEmpty
(
messages
))
{
return
Collections
.
emptyList
();
}
List
<
ReleaseMessage
>
releaseMessages
=
Lists
.
newArrayList
();
for
(
String
message
:
messages
)
{
ReleaseMessage
releaseMessage
=
releaseMessageCache
.
get
(
message
);
if
(
releaseMessage
!=
null
)
{
releaseMessages
.
add
(
releaseMessage
);
}
}
return
releaseMessages
;
}
@Override
public
void
handleMessage
(
ReleaseMessage
message
,
String
channel
)
{
//Could stop once the ReleaseMessageScanner starts to work
doScan
.
set
(
false
);
logger
.
info
(
"message received - channel: {}, message: {}"
,
channel
,
message
);
String
content
=
message
.
getMessage
();
Tracer
.
logEvent
(
"Apollo.ReleaseMessageService.UpdateCache"
,
String
.
valueOf
(
message
.
getId
()));
if
(!
Topics
.
APOLLO_RELEASE_TOPIC
.
equals
(
channel
)
||
Strings
.
isNullOrEmpty
(
content
))
{
return
;
}
long
gap
=
message
.
getId
()
-
maxIdScanned
;
if
(
gap
==
1
)
{
mergeReleaseMessage
(
message
);
}
else
if
(
gap
>
1
)
{
//gap found!
loadReleaseMessages
(
maxIdScanned
);
}
}
@Override
public
void
afterPropertiesSet
()
throws
Exception
{
populateDataBaseInterval
();
//block the startup process until load finished
//this should happen before ReleaseMessageScanner due to autowire
loadReleaseMessages
(
0
);
executorService
.
submit
(()
->
{
while
(
doScan
.
get
()
&&
!
Thread
.
currentThread
().
isInterrupted
())
{
Transaction
transaction
=
Tracer
.
newTransaction
(
"Apollo.ReleaseMessageServiceWithCache"
,
"scanNewReleaseMessages"
);
try
{
loadReleaseMessages
(
maxIdScanned
);
transaction
.
setStatus
(
Transaction
.
SUCCESS
);
}
catch
(
Throwable
ex
)
{
transaction
.
setStatus
(
ex
);
logger
.
error
(
"Scan new release messages failed"
,
ex
);
}
finally
{
transaction
.
complete
();
}
try
{
scanIntervalTimeUnit
.
sleep
(
scanInterval
);
}
catch
(
InterruptedException
e
)
{
//ignore
}
}
});
}
private
synchronized
void
mergeReleaseMessage
(
ReleaseMessage
releaseMessage
)
{
ReleaseMessage
old
=
releaseMessageCache
.
get
(
releaseMessage
.
getMessage
());
if
(
old
==
null
||
releaseMessage
.
getId
()
>
old
.
getId
())
{
releaseMessageCache
.
put
(
releaseMessage
.
getMessage
(),
releaseMessage
);
maxIdScanned
=
releaseMessage
.
getId
();
}
}
private
void
loadReleaseMessages
(
long
startId
)
{
boolean
hasMore
=
true
;
while
(
hasMore
&&
!
Thread
.
currentThread
().
isInterrupted
())
{
//current batch is 500
List
<
ReleaseMessage
>
releaseMessages
=
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
startId
);
if
(
CollectionUtils
.
isEmpty
(
releaseMessages
))
{
break
;
}
releaseMessages
.
forEach
(
this
::
mergeReleaseMessage
);
int
scanned
=
releaseMessages
.
size
();
startId
=
releaseMessages
.
get
(
scanned
-
1
).
getId
();
hasMore
=
scanned
==
500
;
logger
.
info
(
"Loaded {} release messages with startId {}"
,
scanned
,
startId
);
}
}
private
void
populateDataBaseInterval
()
{
scanInterval
=
bizConfig
.
releaseMessageCacheScanInterval
();
scanIntervalTimeUnit
=
bizConfig
.
releaseMessageCacheScanIntervalTimeUnit
();
}
//only for test use
private
void
reset
()
throws
Exception
{
executorService
.
shutdownNow
();
initialize
();
afterPropertiesSet
();
}
}
apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/util/WatchKeysUtil.java
浏览文件 @
4949eb55
...
...
@@ -7,7 +7,7 @@ import com.google.common.collect.HashMultimap;
import
com.google.common.collect.Multimap
;
import
com.google.common.collect.Sets
;
import
com.ctrip.framework.apollo.
biz.service.AppNamespaceServic
e
;
import
com.ctrip.framework.apollo.
configservice.service.AppNamespaceServiceWithCach
e
;
import
com.ctrip.framework.apollo.common.entity.AppNamespace
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
...
...
@@ -26,7 +26,7 @@ import java.util.Set;
public
class
WatchKeysUtil
{
private
static
final
Joiner
STRING_JOINER
=
Joiner
.
on
(
ConfigConsts
.
CLUSTER_NAMESPACE_SEPARATOR
);
@Autowired
private
AppNamespaceService
appNamespaceService
;
private
AppNamespaceService
WithCache
appNamespaceService
;
/**
* Assemble watch keys for the given appId, cluster, namespace, dataCenter combination
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/AllTests.java
浏览文件 @
4949eb55
...
...
@@ -8,6 +8,8 @@ import com.ctrip.framework.apollo.configservice.integration.ConfigControllerInte
import
com.ctrip.framework.apollo.configservice.integration.ConfigFileControllerIntegrationTest
;
import
com.ctrip.framework.apollo.configservice.integration.NotificationControllerIntegrationTest
;
import
com.ctrip.framework.apollo.configservice.integration.NotificationControllerV2IntegrationTest
;
import
com.ctrip.framework.apollo.configservice.service.AppNamespaceServiceWithCacheTest
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCacheTest
;
import
com.ctrip.framework.apollo.configservice.util.InstanceConfigAuditUtilTest
;
import
com.ctrip.framework.apollo.configservice.util.NamespaceUtilTest
;
import
com.ctrip.framework.apollo.configservice.util.WatchKeysUtilTest
;
...
...
@@ -22,7 +24,8 @@ import org.junit.runners.Suite.SuiteClasses;
NamespaceUtilTest
.
class
,
ConfigFileControllerTest
.
class
,
ConfigFileControllerIntegrationTest
.
class
,
WatchKeysUtilTest
.
class
,
NotificationControllerV2Test
.
class
,
NotificationControllerV2IntegrationTest
.
class
,
InstanceConfigAuditUtilTest
.
class
InstanceConfigAuditUtilTest
.
class
,
AppNamespaceServiceWithCacheTest
.
class
,
ReleaseMessageServiceWithCacheTest
.
class
})
public
class
AllTests
{
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerTest.java
浏览文件 @
4949eb55
...
...
@@ -6,8 +6,8 @@ import com.google.common.collect.Sets;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.service.ReleaseMessageService
;
import
com.ctrip.framework.apollo.biz.utils.EntityManagerUtil
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
;
import
com.ctrip.framework.apollo.configservice.util.NamespaceUtil
;
import
com.ctrip.framework.apollo.configservice.util.WatchKeysUtil
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
...
...
@@ -45,7 +45,7 @@ public class NotificationControllerTest {
private
long
someNotificationId
;
private
String
someClientIp
;
@Mock
private
ReleaseMessageService
releaseMessageService
;
private
ReleaseMessageService
WithCache
releaseMessageService
;
@Mock
private
EntityManagerUtil
entityManagerUtil
;
@Mock
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java
浏览文件 @
4949eb55
...
...
@@ -9,8 +9,8 @@ import com.google.gson.Gson;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.service.ReleaseMessageService
;
import
com.ctrip.framework.apollo.biz.utils.EntityManagerUtil
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
;
import
com.ctrip.framework.apollo.configservice.util.NamespaceUtil
;
import
com.ctrip.framework.apollo.configservice.util.WatchKeysUtil
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
...
...
@@ -50,7 +50,7 @@ public class NotificationControllerV2Test {
private
long
someNotificationId
;
private
String
someClientIp
;
@Mock
private
ReleaseMessageService
releaseMessageService
;
private
ReleaseMessageService
WithCache
releaseMessageService
;
@Mock
private
EntityManagerUtil
entityManagerUtil
;
@Mock
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/integration/NotificationControllerIntegrationTest.java
浏览文件 @
4949eb55
...
...
@@ -2,14 +2,19 @@ package com.ctrip.framework.apollo.configservice.integration;
import
com.google.common.base.Joiner
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
import
com.ctrip.framework.apollo.core.dto.ApolloConfigNotification
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.test.annotation.DirtiesContext
;
import
org.springframework.test.context.jdbc.Sql
;
import
org.springframework.test.util.ReflectionTestUtils
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
...
...
@@ -28,8 +33,12 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
private
String
somePublicNamespace
;
private
ExecutorService
executorService
;
@Autowired
private
ReleaseMessageServiceWithCache
releaseMessageServiceWithCache
;
@Before
public
void
setUp
()
throws
Exception
{
ReflectionTestUtils
.
invokeMethod
(
releaseMessageServiceWithCache
,
"reset"
);
someAppId
=
"someAppId"
;
someCluster
=
ConfigConsts
.
CLUSTER_NAME_DEFAULT
;
defaultNamespace
=
ConfigConsts
.
NAMESPACE_APPLICATION
;
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/integration/NotificationControllerV2IntegrationTest.java
浏览文件 @
4949eb55
...
...
@@ -5,9 +5,11 @@ import com.google.common.collect.Lists;
import
com.google.common.collect.Sets
;
import
com.google.gson.Gson
;
import
com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
import
com.ctrip.framework.apollo.core.dto.ApolloConfigNotification
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -16,6 +18,7 @@ import org.springframework.http.HttpMethod;
import
org.springframework.http.HttpStatus
;
import
org.springframework.http.ResponseEntity
;
import
org.springframework.test.context.jdbc.Sql
;
import
org.springframework.test.util.ReflectionTestUtils
;
import
java.util.List
;
import
java.util.Set
;
...
...
@@ -33,6 +36,9 @@ public class NotificationControllerV2IntegrationTest extends AbstractBaseIntegra
@Autowired
private
Gson
gson
;
@Autowired
private
ReleaseMessageServiceWithCache
releaseMessageServiceWithCache
;
private
String
someAppId
;
private
String
someCluster
;
private
String
defaultNamespace
;
...
...
@@ -42,6 +48,7 @@ public class NotificationControllerV2IntegrationTest extends AbstractBaseIntegra
@Before
public
void
setUp
()
throws
Exception
{
ReflectionTestUtils
.
invokeMethod
(
releaseMessageServiceWithCache
,
"reset"
);
someAppId
=
"someAppId"
;
someCluster
=
ConfigConsts
.
CLUSTER_NAME_DEFAULT
;
defaultNamespace
=
ConfigConsts
.
NAMESPACE_APPLICATION
;
...
...
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/service/AppNamespaceServiceWithCacheTest.java
0 → 100644
浏览文件 @
4949eb55
package
com.ctrip.framework.apollo.configservice.service
;
import
com.google.common.collect.Lists
;
import
com.google.common.collect.Sets
;
import
com.ctrip.framework.apollo.biz.config.BizConfig
;
import
com.ctrip.framework.apollo.biz.repository.AppNamespaceRepository
;
import
com.ctrip.framework.apollo.common.entity.AppNamespace
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.runners.MockitoJUnitRunner
;
import
org.springframework.test.util.ReflectionTestUtils
;
import
java.util.Calendar
;
import
java.util.Collections
;
import
java.util.Comparator
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.concurrent.TimeUnit
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
junit
.
Assert
.
assertTrue
;
import
static
org
.
mockito
.
Mockito
.
when
;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
AppNamespaceServiceWithCacheTest
{
private
AppNamespaceServiceWithCache
appNamespaceServiceWithCache
;
@Mock
private
AppNamespaceRepository
appNamespaceRepository
;
@Mock
private
BizConfig
bizConfig
;
private
int
scanInterval
;
private
TimeUnit
scanIntervalTimeUnit
;
private
Comparator
<
AppNamespace
>
appNamespaceComparator
=
(
o1
,
o2
)
->
(
int
)
(
o1
.
getId
()
-
o2
.
getId
());
@Before
public
void
setUp
()
throws
Exception
{
appNamespaceServiceWithCache
=
new
AppNamespaceServiceWithCache
();
ReflectionTestUtils
.
setField
(
appNamespaceServiceWithCache
,
"appNamespaceRepository"
,
appNamespaceRepository
);
ReflectionTestUtils
.
setField
(
appNamespaceServiceWithCache
,
"bizConfig"
,
bizConfig
);
scanInterval
=
10
;
scanIntervalTimeUnit
=
TimeUnit
.
MILLISECONDS
;
when
(
bizConfig
.
appNamespaceCacheRebuildInterval
()).
thenReturn
(
scanInterval
);
when
(
bizConfig
.
appNamespaceCacheRebuildIntervalTimeUnit
()).
thenReturn
(
scanIntervalTimeUnit
);
when
(
bizConfig
.
appNamespaceCacheScanInterval
()).
thenReturn
(
scanInterval
);
when
(
bizConfig
.
appNamespaceCacheScanIntervalTimeUnit
()).
thenReturn
(
scanIntervalTimeUnit
);
}
@Test
public
void
testAppNamespace
()
throws
Exception
{
String
someAppId
=
"someAppId"
;
String
somePrivateNamespace
=
"somePrivateNamespace"
;
long
somePrivateNamespaceId
=
1
;
String
yetAnotherPrivateNamespace
=
"anotherPrivateNamespace"
;
long
yetAnotherPrivateNamespaceId
=
4
;
String
anotherPublicNamespace
=
"anotherPublicNamespace"
;
long
anotherPublicNamespaceId
=
5
;
String
somePublicAppId
=
"somePublicAppId"
;
String
somePublicNamespace
=
"somePublicNamespace"
;
long
somePublicNamespaceId
=
2
;
String
anotherPrivateNamespace
=
"anotherPrivateNamespace"
;
long
anotherPrivateNamespaceId
=
3
;
AppNamespace
somePrivateAppNamespace
=
assembleAppNamespace
(
somePrivateNamespaceId
,
someAppId
,
somePrivateNamespace
,
false
);
AppNamespace
somePublicAppNamespace
=
assembleAppNamespace
(
somePublicNamespaceId
,
somePublicAppId
,
somePublicNamespace
,
true
);
AppNamespace
anotherPrivateAppNamespace
=
assembleAppNamespace
(
anotherPrivateNamespaceId
,
somePublicAppId
,
anotherPrivateNamespace
,
false
);
AppNamespace
yetAnotherPrivateAppNamespace
=
assembleAppNamespace
(
yetAnotherPrivateNamespaceId
,
someAppId
,
yetAnotherPrivateNamespace
,
false
);
AppNamespace
anotherPublicAppNamespace
=
assembleAppNamespace
(
anotherPublicNamespaceId
,
someAppId
,
anotherPublicNamespace
,
true
);
Set
<
String
>
someAppIdNamespaces
=
Sets
.
newHashSet
(
somePrivateNamespace
,
yetAnotherPrivateNamespace
,
anotherPublicNamespace
);
Set
<
String
>
somePublicAppIdNamespaces
=
Sets
.
newHashSet
(
somePublicNamespace
,
anotherPrivateNamespace
);
Set
<
String
>
publicNamespaces
=
Sets
.
newHashSet
(
somePublicNamespace
,
anotherPublicNamespace
);
List
<
Long
>
appNamespaceIds
=
Lists
.
newArrayList
(
somePrivateNamespaceId
,
somePublicNamespaceId
,
anotherPrivateNamespaceId
,
yetAnotherPrivateNamespaceId
,
anotherPublicNamespaceId
);
List
<
AppNamespace
>
allAppNamespaces
=
Lists
.
newArrayList
(
somePrivateAppNamespace
,
somePublicAppNamespace
,
anotherPrivateAppNamespace
,
yetAnotherPrivateAppNamespace
,
anotherPublicAppNamespace
);
// Test init
appNamespaceServiceWithCache
.
afterPropertiesSet
();
// Should have no record now
assertTrue
(
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
someAppId
,
someAppIdNamespaces
)
.
isEmpty
());
assertTrue
(
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
somePublicAppId
,
somePublicAppIdNamespaces
).
isEmpty
());
assertTrue
(
appNamespaceServiceWithCache
.
findPublicNamespacesByNames
(
publicNamespaces
).
isEmpty
());
// Add 1 private namespace and 1 public namespace
when
(
appNamespaceRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
0
)).
thenReturn
(
Lists
.
newArrayList
(
somePrivateAppNamespace
,
somePublicAppNamespace
));
when
(
appNamespaceRepository
.
findAll
(
Lists
.
newArrayList
(
somePrivateNamespaceId
,
somePublicNamespaceId
))).
thenReturn
(
Lists
.
newArrayList
(
somePrivateAppNamespace
,
somePublicAppNamespace
));
scanIntervalTimeUnit
.
sleep
(
scanInterval
*
3
);
check
(
Lists
.
newArrayList
(
somePrivateAppNamespace
),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
someAppId
,
someAppIdNamespaces
));
check
(
Lists
.
newArrayList
(
somePublicAppNamespace
),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
somePublicAppId
,
somePublicAppIdNamespaces
));
check
(
Lists
.
newArrayList
(
somePublicAppNamespace
),
appNamespaceServiceWithCache
.
findPublicNamespacesByNames
(
publicNamespaces
));
// Add 2 private namespaces and 1 public namespace
when
(
appNamespaceRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
somePublicNamespaceId
))
.
thenReturn
(
Lists
.
newArrayList
(
anotherPrivateAppNamespace
,
yetAnotherPrivateAppNamespace
,
anotherPublicAppNamespace
));
when
(
appNamespaceRepository
.
findAll
(
appNamespaceIds
)).
thenReturn
(
allAppNamespaces
);
scanIntervalTimeUnit
.
sleep
(
scanInterval
*
3
);
check
(
Lists
.
newArrayList
(
somePrivateAppNamespace
,
yetAnotherPrivateAppNamespace
,
anotherPublicAppNamespace
),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
someAppId
,
someAppIdNamespaces
));
check
(
Lists
.
newArrayList
(
somePublicAppNamespace
,
anotherPrivateAppNamespace
),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
somePublicAppId
,
somePublicAppIdNamespaces
));
check
(
Lists
.
newArrayList
(
somePublicAppNamespace
,
anotherPublicAppNamespace
),
appNamespaceServiceWithCache
.
findPublicNamespacesByNames
(
publicNamespaces
));
// Update name
String
somePrivateNamespaceNew
=
"somePrivateNamespaceNew"
;
AppNamespace
somePrivateAppNamespaceNew
=
assembleAppNamespace
(
somePrivateAppNamespace
.
getId
(),
somePrivateAppNamespace
.
getAppId
(),
somePrivateNamespaceNew
,
somePrivateAppNamespace
.
isPublic
());
somePrivateAppNamespaceNew
.
setDataChangeLastModifiedTime
(
newDateWithDelta
(
somePrivateAppNamespace
.
getDataChangeLastModifiedTime
(),
1
));
// Update appId
String
someAppIdNew
=
"someAppIdNew"
;
AppNamespace
yetAnotherPrivateAppNamespaceNew
=
assembleAppNamespace
(
yetAnotherPrivateAppNamespace
.
getId
(),
someAppIdNew
,
yetAnotherPrivateAppNamespace
.
getName
(),
false
);
yetAnotherPrivateAppNamespaceNew
.
setDataChangeLastModifiedTime
(
newDateWithDelta
(
yetAnotherPrivateAppNamespace
.
getDataChangeLastModifiedTime
(),
1
));
// Update isPublic
AppNamespace
somePublicAppNamespaceNew
=
assembleAppNamespace
(
somePublicAppNamespace
.
getId
(),
somePublicAppNamespace
.
getAppId
(),
somePublicAppNamespace
.
getName
(),
!
somePublicAppNamespace
.
isPublic
());
somePublicAppNamespaceNew
.
setDataChangeLastModifiedTime
(
newDateWithDelta
(
somePublicAppNamespace
.
getDataChangeLastModifiedTime
(),
1
));
// Delete 1 private and 1 public
when
(
appNamespaceRepository
.
findAll
(
appNamespaceIds
)).
thenReturn
(
Lists
.
newArrayList
(
somePrivateAppNamespaceNew
,
yetAnotherPrivateAppNamespaceNew
,
somePublicAppNamespaceNew
));
scanIntervalTimeUnit
.
sleep
(
scanInterval
*
3
);
check
(
Collections
.
emptyList
(),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
someAppId
,
someAppIdNamespaces
));
check
(
Lists
.
newArrayList
(
somePublicAppNamespaceNew
),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
somePublicAppId
,
somePublicAppIdNamespaces
));
check
(
Collections
.
emptyList
(),
appNamespaceServiceWithCache
.
findPublicNamespacesByNames
(
publicNamespaces
));
check
(
Lists
.
newArrayList
(
somePrivateAppNamespaceNew
),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
someAppId
,
Sets
.
newHashSet
(
somePrivateNamespaceNew
)));
check
(
Lists
.
newArrayList
(
yetAnotherPrivateAppNamespaceNew
),
appNamespaceServiceWithCache
.
findByAppIdAndNamespaces
(
someAppIdNew
,
Sets
.
newHashSet
(
yetAnotherPrivateNamespace
)));
}
private
void
check
(
List
<
AppNamespace
>
someList
,
List
<
AppNamespace
>
anotherList
)
{
Collections
.
sort
(
someList
,
appNamespaceComparator
);
Collections
.
sort
(
anotherList
,
appNamespaceComparator
);
assertEquals
(
someList
,
anotherList
);
}
private
Date
newDateWithDelta
(
Date
date
,
int
deltaInSeconds
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTime
(
date
);
calendar
.
add
(
Calendar
.
SECOND
,
deltaInSeconds
);
return
calendar
.
getTime
();
}
private
AppNamespace
assembleAppNamespace
(
long
id
,
String
appId
,
String
name
,
boolean
isPublic
)
{
AppNamespace
appNamespace
=
new
AppNamespace
();
appNamespace
.
setId
(
id
);
appNamespace
.
setAppId
(
appId
);
appNamespace
.
setName
(
name
);
appNamespace
.
setPublic
(
isPublic
);
appNamespace
.
setDataChangeLastModifiedTime
(
new
Date
());
return
appNamespace
;
}
}
\ No newline at end of file
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/service/ReleaseMessageServiceWithCacheTest.java
0 → 100644
浏览文件 @
4949eb55
package
com.ctrip.framework.apollo.configservice.service
;
import
com.google.common.collect.Lists
;
import
com.google.common.collect.Sets
;
import
com.ctrip.framework.apollo.biz.config.BizConfig
;
import
com.ctrip.framework.apollo.biz.entity.ReleaseMessage
;
import
com.ctrip.framework.apollo.biz.message.Topics
;
import
com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.runners.MockitoJUnitRunner
;
import
org.springframework.test.util.ReflectionTestUtils
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.concurrent.TimeUnit
;
import
static
org
.
junit
.
Assert
.*;
import
static
org
.
mockito
.
Mockito
.
times
;
import
static
org
.
mockito
.
Mockito
.
verify
;
import
static
org
.
mockito
.
Mockito
.
when
;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
ReleaseMessageServiceWithCacheTest
{
private
ReleaseMessageServiceWithCache
releaseMessageServiceWithCache
;
@Mock
private
ReleaseMessageRepository
releaseMessageRepository
;
@Mock
private
BizConfig
bizConfig
;
private
int
scanInterval
;
private
TimeUnit
scanIntervalTimeUnit
;
@Before
public
void
setUp
()
throws
Exception
{
releaseMessageServiceWithCache
=
new
ReleaseMessageServiceWithCache
();
ReflectionTestUtils
.
setField
(
releaseMessageServiceWithCache
,
"releaseMessageRepository"
,
releaseMessageRepository
);
ReflectionTestUtils
.
setField
(
releaseMessageServiceWithCache
,
"bizConfig"
,
bizConfig
);
scanInterval
=
10
;
scanIntervalTimeUnit
=
TimeUnit
.
MILLISECONDS
;
when
(
bizConfig
.
releaseMessageCacheScanInterval
()).
thenReturn
(
scanInterval
);
when
(
bizConfig
.
releaseMessageCacheScanIntervalTimeUnit
()).
thenReturn
(
scanIntervalTimeUnit
);
}
@Test
public
void
testWhenNoReleaseMessages
()
throws
Exception
{
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
0L
)).
thenReturn
(
Collections
.
emptyList
());
releaseMessageServiceWithCache
.
afterPropertiesSet
();
String
someMessage
=
"someMessage"
;
String
anotherMessage
=
"anotherMessage"
;
Set
<
String
>
messages
=
Sets
.
newHashSet
(
someMessage
,
anotherMessage
);
assertNull
(
releaseMessageServiceWithCache
.
findLatestReleaseMessageForMessages
(
messages
));
assertTrue
(
releaseMessageServiceWithCache
.
findLatestReleaseMessagesGroupByMessages
(
messages
)
.
isEmpty
());
}
@Test
public
void
testWhenHasReleaseMsgAndHasRepeatMsg
()
throws
Exception
{
String
someMsgContent
=
"msg1"
;
ReleaseMessage
someMsg
=
assembleReleaseMsg
(
1
,
someMsgContent
);
String
anotherMsgContent
=
"msg2"
;
ReleaseMessage
anotherMsg
=
assembleReleaseMsg
(
2
,
anotherMsgContent
);
ReleaseMessage
anotherRepeatMsg
=
assembleReleaseMsg
(
3
,
anotherMsgContent
);
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
0L
))
.
thenReturn
(
Arrays
.
asList
(
someMsg
,
anotherMsg
,
anotherRepeatMsg
));
releaseMessageServiceWithCache
.
afterPropertiesSet
();
verify
(
bizConfig
).
releaseMessageCacheScanInterval
();
ReleaseMessage
latestReleaseMsg
=
releaseMessageServiceWithCache
.
findLatestReleaseMessageForMessages
(
Sets
.
newHashSet
(
someMsgContent
,
anotherMsgContent
));
assertNotNull
(
latestReleaseMsg
);
assertEquals
(
3
,
latestReleaseMsg
.
getId
());
assertEquals
(
anotherMsgContent
,
latestReleaseMsg
.
getMessage
());
List
<
ReleaseMessage
>
latestReleaseMsgGroupByMsgContent
=
releaseMessageServiceWithCache
.
findLatestReleaseMessagesGroupByMessages
(
Sets
.
newHashSet
(
someMsgContent
,
anotherMsgContent
));
assertEquals
(
2
,
latestReleaseMsgGroupByMsgContent
.
size
());
assertEquals
(
1
,
latestReleaseMsgGroupByMsgContent
.
get
(
1
).
getId
());
assertEquals
(
someMsgContent
,
latestReleaseMsgGroupByMsgContent
.
get
(
1
).
getMessage
());
assertEquals
(
3
,
latestReleaseMsgGroupByMsgContent
.
get
(
0
).
getId
());
assertEquals
(
anotherMsgContent
,
latestReleaseMsgGroupByMsgContent
.
get
(
0
).
getMessage
());
}
@Test
public
void
testWhenReleaseMsgSizeBiggerThan500
()
throws
Exception
{
String
someMsgContent
=
"msg1"
;
List
<
ReleaseMessage
>
firstBatchReleaseMsg
=
new
ArrayList
<>(
500
);
for
(
int
i
=
0
;
i
<
500
;
i
++)
{
firstBatchReleaseMsg
.
add
(
assembleReleaseMsg
(
i
+
1
,
someMsgContent
));
}
String
antherMsgContent
=
"msg2"
;
ReleaseMessage
antherMsg
=
assembleReleaseMsg
(
501
,
antherMsgContent
);
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
0L
))
.
thenReturn
(
firstBatchReleaseMsg
);
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
500L
))
.
thenReturn
(
Collections
.
singletonList
(
antherMsg
));
releaseMessageServiceWithCache
.
afterPropertiesSet
();
verify
(
releaseMessageRepository
,
times
(
1
)).
findFirst500ByIdGreaterThanOrderByIdAsc
(
500L
);
ReleaseMessage
latestReleaseMsg
=
releaseMessageServiceWithCache
.
findLatestReleaseMessageForMessages
(
Sets
.
newHashSet
(
someMsgContent
,
antherMsgContent
));
assertNotNull
(
latestReleaseMsg
);
assertEquals
(
501
,
latestReleaseMsg
.
getId
());
assertEquals
(
antherMsgContent
,
latestReleaseMsg
.
getMessage
());
List
<
ReleaseMessage
>
latestReleaseMsgGroupByMsgContent
=
releaseMessageServiceWithCache
.
findLatestReleaseMessagesGroupByMessages
(
Sets
.
newHashSet
(
someMsgContent
,
antherMsgContent
));
assertEquals
(
2
,
latestReleaseMsgGroupByMsgContent
.
size
());
assertEquals
(
500
,
latestReleaseMsgGroupByMsgContent
.
get
(
1
).
getId
());
assertEquals
(
501
,
latestReleaseMsgGroupByMsgContent
.
get
(
0
).
getId
());
}
@Test
public
void
testNewReleaseMessagesBeforeHandleMessage
()
throws
Exception
{
String
someMessageContent
=
"someMessage"
;
long
someMessageId
=
1
;
ReleaseMessage
someMessage
=
assembleReleaseMsg
(
someMessageId
,
someMessageContent
);
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
0L
)).
thenReturn
(
Lists
.
newArrayList
(
someMessage
));
releaseMessageServiceWithCache
.
afterPropertiesSet
();
ReleaseMessage
latestReleaseMsg
=
releaseMessageServiceWithCache
.
findLatestReleaseMessageForMessages
(
Sets
.
newHashSet
(
someMessageContent
));
List
<
ReleaseMessage
>
latestReleaseMsgGroupByMsgContent
=
releaseMessageServiceWithCache
.
findLatestReleaseMessagesGroupByMessages
(
Sets
.
newHashSet
(
someMessageContent
));
assertEquals
(
someMessageId
,
latestReleaseMsg
.
getId
());
assertEquals
(
someMessageContent
,
latestReleaseMsg
.
getMessage
());
assertEquals
(
latestReleaseMsg
,
latestReleaseMsgGroupByMsgContent
.
get
(
0
));
long
newMessageId
=
2
;
ReleaseMessage
newMessage
=
assembleReleaseMsg
(
newMessageId
,
someMessageContent
);
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
someMessageId
)).
thenReturn
(
Lists
.
newArrayList
(
newMessage
));
scanIntervalTimeUnit
.
sleep
(
scanInterval
*
3
);
ReleaseMessage
newLatestReleaseMsg
=
releaseMessageServiceWithCache
.
findLatestReleaseMessageForMessages
(
Sets
.
newHashSet
(
someMessageContent
));
List
<
ReleaseMessage
>
newLatestReleaseMsgGroupByMsgContent
=
releaseMessageServiceWithCache
.
findLatestReleaseMessagesGroupByMessages
(
Sets
.
newHashSet
(
someMessageContent
));
assertEquals
(
newMessageId
,
newLatestReleaseMsg
.
getId
());
assertEquals
(
someMessageContent
,
newLatestReleaseMsg
.
getMessage
());
assertEquals
(
newLatestReleaseMsg
,
newLatestReleaseMsgGroupByMsgContent
.
get
(
0
));
}
@Test
public
void
testNewReleasesWithHandleMessage
()
throws
Exception
{
String
someMessageContent
=
"someMessage"
;
long
someMessageId
=
1
;
ReleaseMessage
someMessage
=
assembleReleaseMsg
(
someMessageId
,
someMessageContent
);
when
(
releaseMessageRepository
.
findFirst500ByIdGreaterThanOrderByIdAsc
(
0L
)).
thenReturn
(
Lists
.
newArrayList
(
someMessage
));
releaseMessageServiceWithCache
.
afterPropertiesSet
();
ReleaseMessage
latestReleaseMsg
=
releaseMessageServiceWithCache
.
findLatestReleaseMessageForMessages
(
Sets
.
newHashSet
(
someMessageContent
));
List
<
ReleaseMessage
>
latestReleaseMsgGroupByMsgContent
=
releaseMessageServiceWithCache
.
findLatestReleaseMessagesGroupByMessages
(
Sets
.
newHashSet
(
someMessageContent
));
assertEquals
(
someMessageId
,
latestReleaseMsg
.
getId
());
assertEquals
(
someMessageContent
,
latestReleaseMsg
.
getMessage
());
assertEquals
(
latestReleaseMsg
,
latestReleaseMsgGroupByMsgContent
.
get
(
0
));
long
newMessageId
=
2
;
ReleaseMessage
newMessage
=
assembleReleaseMsg
(
newMessageId
,
someMessageContent
);
releaseMessageServiceWithCache
.
handleMessage
(
newMessage
,
Topics
.
APOLLO_RELEASE_TOPIC
);
ReleaseMessage
newLatestReleaseMsg
=
releaseMessageServiceWithCache
.
findLatestReleaseMessageForMessages
(
Sets
.
newHashSet
(
someMessageContent
));
List
<
ReleaseMessage
>
newLatestReleaseMsgGroupByMsgContent
=
releaseMessageServiceWithCache
.
findLatestReleaseMessagesGroupByMessages
(
Sets
.
newHashSet
(
someMessageContent
));
assertEquals
(
newMessageId
,
newLatestReleaseMsg
.
getId
());
assertEquals
(
someMessageContent
,
newLatestReleaseMsg
.
getMessage
());
assertEquals
(
newLatestReleaseMsg
,
newLatestReleaseMsgGroupByMsgContent
.
get
(
0
));
}
private
ReleaseMessage
assembleReleaseMsg
(
long
id
,
String
msgContent
)
{
ReleaseMessage
msg
=
new
ReleaseMessage
(
msgContent
);
msg
.
setId
(
id
);
return
msg
;
}
}
apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/util/WatchKeysUtilTest.java
浏览文件 @
4949eb55
...
...
@@ -5,7 +5,7 @@ import com.google.common.collect.Lists;
import
com.google.common.collect.Multimap
;
import
com.google.common.collect.Sets
;
import
com.ctrip.framework.apollo.
biz.service.AppNamespaceServic
e
;
import
com.ctrip.framework.apollo.
configservice.service.AppNamespaceServiceWithCach
e
;
import
com.ctrip.framework.apollo.common.entity.AppNamespace
;
import
com.ctrip.framework.apollo.core.ConfigConsts
;
...
...
@@ -29,7 +29,7 @@ import static org.mockito.Mockito.when;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
WatchKeysUtilTest
{
@Mock
private
AppNamespaceService
appNamespaceService
;
private
AppNamespaceService
WithCache
appNamespaceService
;
@Mock
private
AppNamespace
someAppNamespace
;
@Mock
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录