Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
wrr-cat
apollo
提交
2781e18c
apollo
项目概览
wrr-cat
/
apollo
与 Fork 源项目一致
从无法访问的项目Fork
通知
2
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
apollo
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2781e18c
编写于
4月 20, 2016
作者:
J
Jason Song
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add client side long polling support and server side mock impl
上级
93d1ce67
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
349 addition
and
28 deletion
+349
-28
apollo-client/src/main/java/com/ctrip/apollo/internals/ConfigServiceLocator.java
...java/com/ctrip/apollo/internals/ConfigServiceLocator.java
+3
-3
apollo-client/src/main/java/com/ctrip/apollo/internals/DefaultConfig.java
...c/main/java/com/ctrip/apollo/internals/DefaultConfig.java
+7
-0
apollo-client/src/main/java/com/ctrip/apollo/internals/LocalFileConfigRepository.java
...com/ctrip/apollo/internals/LocalFileConfigRepository.java
+3
-1
apollo-client/src/main/java/com/ctrip/apollo/internals/RemoteConfigRepository.java
...va/com/ctrip/apollo/internals/RemoteConfigRepository.java
+124
-8
apollo-client/src/main/java/com/ctrip/apollo/internals/SimpleConfig.java
...rc/main/java/com/ctrip/apollo/internals/SimpleConfig.java
+2
-0
apollo-client/src/main/java/com/ctrip/apollo/model/ConfigChangeEvent.java
...c/main/java/com/ctrip/apollo/model/ConfigChangeEvent.java
+2
-1
apollo-client/src/main/java/com/ctrip/apollo/util/http/HttpUtil.java
...nt/src/main/java/com/ctrip/apollo/util/http/HttpUtil.java
+12
-9
apollo-client/src/test/java/com/ctrip/apollo/integration/ConfigIntegrationTest.java
...a/com/ctrip/apollo/integration/ConfigIntegrationTest.java
+62
-4
apollo-client/src/test/java/com/ctrip/apollo/internals/RemoteConfigRepositoryTest.java
...om/ctrip/apollo/internals/RemoteConfigRepositoryTest.java
+51
-1
apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/ConfigController.java
...rip/apollo/configservice/controller/ConfigController.java
+1
-1
apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/NotificationController.java
...ollo/configservice/controller/NotificationController.java
+82
-0
未找到文件。
apollo-client/src/main/java/com/ctrip/apollo/internals/ConfigServiceLocator.java
浏览文件 @
2781e18c
...
...
@@ -30,7 +30,7 @@ public class ConfigServiceLocator {
private
Type
m_responseType
;
/**
* Create a config service locator
* Create a config service locator
.
*/
public
ConfigServiceLocator
()
{
List
<
ServiceDTO
>
initial
=
Lists
.
newArrayList
();
...
...
@@ -88,8 +88,8 @@ public class ConfigServiceLocator {
throw
new
RuntimeException
(
"Get config services failed"
,
exception
);
}
private
void
logConfigServicesToCat
(
List
<
ServiceDTO
>
serviceD
TO
s
)
{
for
(
ServiceDTO
serviceDTO
:
serviceD
TO
s
)
{
private
void
logConfigServicesToCat
(
List
<
ServiceDTO
>
serviceD
to
s
)
{
for
(
ServiceDTO
serviceDTO
:
serviceD
to
s
)
{
Cat
.
logEvent
(
"Apollo.Config.Services"
,
serviceDTO
.
getHomepageUrl
());
}
}
...
...
apollo-client/src/main/java/com/ctrip/apollo/internals/DefaultConfig.java
浏览文件 @
2781e18c
...
...
@@ -95,7 +95,14 @@ public class DefaultConfig extends AbstractConfig implements RepositoryChangeLis
Map
<
String
,
ConfigChange
>
actualChanges
=
updateAndCalcConfigChanges
(
newConfigProperties
);
//check double checked result
if
(
actualChanges
.
isEmpty
())
{
return
;
}
this
.
fireConfigChange
(
new
ConfigChangeEvent
(
m_namespace
,
actualChanges
));
Cat
.
logEvent
(
"Apollo.Client.ConfigChanges"
,
m_namespace
);
}
private
Map
<
String
,
ConfigChange
>
updateAndCalcConfigChanges
(
Properties
newConfigProperties
)
{
...
...
apollo-client/src/main/java/com/ctrip/apollo/internals/LocalFileConfigRepository.java
浏览文件 @
2781e18c
...
...
@@ -120,7 +120,9 @@ public class LocalFileConfigRepository extends AbstractConfigRepository
updateFileProperties
(
properties
);
}
catch
(
Throwable
ex
)
{
Cat
.
logError
(
ex
);
logger
.
warn
(
"Sync config from fallback repository {} failed, reason: {}"
,
m_fallback
.
getClass
(),
ex
);
logger
.
warn
(
"Sync config from fallback repository {} failed, reason: {}"
,
m_fallback
.
getClass
(),
ex
);
}
}
...
...
apollo-client/src/main/java/com/ctrip/apollo/internals/RemoteConfigRepository.java
浏览文件 @
2781e18c
package
com.ctrip.apollo.internals
;
import
com.google.common.base.Joiner
;
import
com.google.common.base.Strings
;
import
com.google.common.collect.Lists
;
import
com.google.common.collect.Maps
;
import
com.google.common.escape.Escaper
;
import
com.google.common.net.UrlEscapers
;
import
com.ctrip.apollo.core.dto.ApolloConfig
;
import
com.ctrip.apollo.core.dto.ApolloConfigNotification
;
import
com.ctrip.apollo.core.dto.ServiceDTO
;
import
com.ctrip.apollo.core.utils.ApolloThreadFactory
;
import
com.ctrip.apollo.util.ConfigUtil
;
...
...
@@ -22,10 +27,14 @@ import org.unidal.lookup.ContainerLoader;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.Random
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicReference
;
/**
...
...
@@ -40,6 +49,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private
volatile
AtomicReference
<
ApolloConfig
>
m_configCache
;
private
final
String
m_namespace
;
private
final
ScheduledExecutorService
m_executorService
;
private
final
AtomicBoolean
m_longPollingStopped
;
/**
* Constructor.
...
...
@@ -58,10 +68,12 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
Cat
.
logError
(
ex
);
throw
new
IllegalStateException
(
"Unable to load component!"
,
ex
);
}
this
.
m_longPollingStopped
=
new
AtomicBoolean
(
false
);
this
.
m_executorService
=
Executors
.
newScheduledThreadPool
(
1
,
ApolloThreadFactory
.
create
(
"RemoteConfigRepository"
,
true
));
this
.
trySync
();
this
.
schedulePeriodicRefresh
();
this
.
scheduleLongPollingRefresh
();
}
@Override
...
...
@@ -84,7 +96,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
new
Runnable
()
{
@Override
public
void
run
()
{
Transaction
transaction
=
Cat
.
newTransaction
(
"Apollo.ConfigService"
,
"periodicRefresh"
);
trySync
();
transaction
.
setStatus
(
Message
.
SUCCESS
);
transaction
.
complete
();
}
},
m_configUtil
.
getRefreshInterval
(),
m_configUtil
.
getRefreshInterval
(),
m_configUtil
.
getRefreshTimeUnit
());
...
...
@@ -113,11 +128,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
return
result
;
}
private
ApolloConfig
loadApolloConfig
()
{
String
appId
=
m_configUtil
.
getAppId
();
String
cluster
=
m_configUtil
.
getCluster
();
Cat
.
logEvent
(
"Apollo.Client.Config"
,
String
.
format
(
"%s-%s-%s"
,
appId
,
cluster
,
m_namespace
));
Cat
.
logEvent
(
"Apollo.Client.ConfigInfo"
,
String
.
format
(
"%s-%s-%s"
,
appId
,
cluster
,
m_namespace
));
int
maxRetries
=
2
;
Throwable
exception
=
null
;
...
...
@@ -128,7 +143,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
for
(
ServiceDTO
configService
:
randomConfigServices
)
{
String
url
=
assembleUrl
(
configService
.
getHomepageUrl
(),
appId
,
cluster
,
m_namespace
,
assemble
QueryConfig
Url
(
configService
.
getHomepageUrl
(),
appId
,
cluster
,
m_namespace
,
m_configCache
.
get
());
logger
.
debug
(
"Loading config from {}"
,
url
);
...
...
@@ -172,18 +187,19 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
throw
new
RuntimeException
(
message
,
exception
);
}
private
String
assembleUrl
(
String
uri
,
String
appId
,
String
cluster
,
String
namespace
,
ApolloConfig
previousConfig
)
{
private
String
assembleQueryConfigUrl
(
String
uri
,
String
appId
,
String
cluster
,
String
namespace
,
ApolloConfig
previousConfig
)
{
Escaper
escaper
=
UrlEscapers
.
urlPathSegmentEscaper
();
String
path
=
"configs/%s/%s"
;
List
<
String
>
params
=
Lists
.
newArrayList
(
appId
,
cluster
);
List
<
String
>
params
=
Lists
.
newArrayList
(
escaper
.
escape
(
appId
),
escaper
.
escape
(
cluster
)
);
if
(!
Strings
.
isNullOrEmpty
(
namespace
))
{
path
=
path
+
"/%s"
;
params
.
add
(
namespace
);
params
.
add
(
escaper
.
escape
(
namespace
)
);
}
if
(
previousConfig
!=
null
)
{
path
=
path
+
"?releaseId=%s"
;
params
.
add
(
String
.
valueOf
(
previousConfig
.
getReleaseId
(
)));
params
.
add
(
escaper
.
escape
(
String
.
valueOf
(
previousConfig
.
getReleaseId
()
)));
}
String
pathExpanded
=
String
.
format
(
path
,
params
.
toArray
());
...
...
@@ -193,6 +209,106 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
return
uri
+
pathExpanded
;
}
private
void
scheduleLongPollingRefresh
()
{
final
String
appId
=
m_configUtil
.
getAppId
();
final
String
cluster
=
m_configUtil
.
getCluster
();
final
ExecutorService
longPollingService
=
Executors
.
newFixedThreadPool
(
2
,
ApolloThreadFactory
.
create
(
"RemoteConfigRepository-LongPolling"
,
true
));
longPollingService
.
submit
(
new
Runnable
()
{
@Override
public
void
run
()
{
doLongPollingRefresh
(
appId
,
cluster
,
longPollingService
);
}
});
}
private
void
doLongPollingRefresh
(
String
appId
,
String
cluster
,
ExecutorService
longPollingService
)
{
final
Random
random
=
new
Random
();
ServiceDTO
lastServiceDto
=
null
;
Transaction
transaction
=
null
;
while
(!
m_longPollingStopped
.
get
()
&&
!
Thread
.
currentThread
().
isInterrupted
())
{
try
{
if
(
lastServiceDto
==
null
)
{
List
<
ServiceDTO
>
configServices
=
getConfigServices
();
lastServiceDto
=
configServices
.
get
(
random
.
nextInt
(
configServices
.
size
()));
}
String
url
=
assembleLongPollRefreshUrl
(
lastServiceDto
.
getHomepageUrl
(),
appId
,
cluster
,
m_namespace
,
m_configCache
.
get
());
logger
.
debug
(
"Long polling from {}"
,
url
);
HttpRequest
request
=
new
HttpRequest
(
url
);
//no timeout for read
request
.
setReadTimeout
(
0
);
transaction
=
Cat
.
newTransaction
(
"Apollo.ConfigService"
,
"pollNotification"
);
transaction
.
addData
(
"Url"
,
url
);
HttpResponse
<
ApolloConfigNotification
>
response
=
m_httpUtil
.
doGet
(
request
,
ApolloConfigNotification
.
class
);
logger
.
debug
(
"Long polling response: {}, url: {}"
,
response
.
getStatusCode
(),
url
);
if
(
response
.
getStatusCode
()
==
200
)
{
longPollingService
.
submit
(
new
Runnable
()
{
@Override
public
void
run
()
{
trySync
();
}
});
}
transaction
.
addData
(
"StatusCode"
,
response
.
getStatusCode
());
transaction
.
setStatus
(
Message
.
SUCCESS
);
}
catch
(
Throwable
ex
)
{
logger
.
warn
(
"Long polling failed for appId: {}, cluster: {}, namespace: {}, reason: {}"
,
appId
,
cluster
,
m_namespace
,
ex
);
lastServiceDto
=
null
;
Cat
.
logError
(
ex
);
if
(
transaction
!=
null
)
{
transaction
.
setStatus
(
ex
);
}
try
{
TimeUnit
.
SECONDS
.
sleep
(
10
);
}
catch
(
InterruptedException
ie
)
{
//ignore
}
}
finally
{
if
(
transaction
!=
null
)
{
transaction
.
complete
();
}
}
}
}
private
String
assembleLongPollRefreshUrl
(
String
uri
,
String
appId
,
String
cluster
,
String
namespace
,
ApolloConfig
previousConfig
)
{
Escaper
escaper
=
UrlEscapers
.
urlPathSegmentEscaper
();
Map
<
String
,
String
>
queryParams
=
Maps
.
newHashMap
();
queryParams
.
put
(
"appId"
,
escaper
.
escape
(
appId
));
queryParams
.
put
(
"cluster"
,
escaper
.
escape
(
cluster
));
if
(!
Strings
.
isNullOrEmpty
(
namespace
))
{
queryParams
.
put
(
"namespace"
,
escaper
.
escape
(
namespace
));
}
if
(
previousConfig
!=
null
)
{
queryParams
.
put
(
"releaseId"
,
escaper
.
escape
(
previousConfig
.
getReleaseId
()));
}
String
params
=
Joiner
.
on
(
"&"
).
withKeyValueSeparator
(
"="
).
join
(
queryParams
);
if
(!
uri
.
endsWith
(
"/"
))
{
uri
+=
"/"
;
}
return
uri
+
"notifications?"
+
params
;
}
void
stopLongPollingRefresh
()
{
this
.
m_longPollingStopped
.
compareAndSet
(
false
,
true
);
}
private
List
<
ServiceDTO
>
getConfigServices
()
{
List
<
ServiceDTO
>
services
=
m_serviceLocator
.
getConfigServices
();
if
(
services
.
size
()
==
0
)
{
...
...
apollo-client/src/main/java/com/ctrip/apollo/internals/SimpleConfig.java
浏览文件 @
2781e18c
...
...
@@ -74,5 +74,7 @@ public class SimpleConfig extends AbstractConfig implements RepositoryChangeList
m_configProperties
=
newConfigProperties
;
this
.
fireConfigChange
(
new
ConfigChangeEvent
(
m_namespace
,
changeMap
));
Cat
.
logEvent
(
"Apollo.Client.ConfigChanges"
,
m_namespace
);
}
}
apollo-client/src/main/java/com/ctrip/apollo/model/ConfigChangeEvent.java
浏览文件 @
2781e18c
...
...
@@ -40,7 +40,8 @@ public class ConfigChangeEvent {
}
/**
* Get the changes. Please note that the returned Map is immutable.
* Get the changes as <Key, Change> map.
* Please note that the returned Map is immutable.
* @return changes
*/
public
Map
<
String
,
ConfigChange
>
getChanges
()
{
...
...
apollo-client/src/main/java/com/ctrip/apollo/util/http/HttpUtil.java
浏览文件 @
2781e18c
...
...
@@ -27,20 +27,23 @@ public class HttpUtil {
private
ConfigUtil
m_configUtil
;
private
Gson
gson
;
private
String
basicAuth
;
/**
* Constructor.
*/
public
HttpUtil
()
{
gson
=
new
Gson
();
try
{
basicAuth
=
"Basic "
+
BaseEncoding
.
base64
().
encode
(
"user:"
.
getBytes
(
"UTF-8"
));
}
catch
(
UnsupportedEncodingException
e
)
{
e
.
printStackTrace
();
basicAuth
=
"Basic "
+
BaseEncoding
.
base64
().
encode
(
"user:"
.
getBytes
(
"UTF-8"
));
}
catch
(
UnsupportedEncodingException
e
x
)
{
e
x
.
printStackTrace
();
}
}
/**
* Do get operation for the http request.
*
* @param httpRequest the request
* @param httpRequest
the request
* @param responseType the response type
* @return the response
* @throws RuntimeException if any error happened or response code is neither 200 nor 304
...
...
@@ -59,7 +62,7 @@ public class HttpUtil {
/**
* Do get operation for the http request.
*
* @param httpRequest the request
* @param httpRequest
the request
* @param responseType the response type
* @return the response
* @throws RuntimeException if any error happened or response code is neither 200 nor 304
...
...
@@ -76,14 +79,14 @@ public class HttpUtil {
}
private
<
T
>
HttpResponse
<
T
>
doGetWithSerializeFunction
(
HttpRequest
httpRequest
,
Function
<
String
,
T
>
serializeFunction
)
{
Function
<
String
,
T
>
serializeFunction
)
{
InputStream
is
=
null
;
try
{
HttpURLConnection
conn
=
(
HttpURLConnection
)
new
URL
(
httpRequest
.
getUrl
()).
openConnection
();
conn
.
setRequestMethod
(
"GET"
);
conn
.
setRequestProperty
(
"Authorization"
,
basicAuth
);
conn
.
setRequestProperty
(
"Authorization"
,
basicAuth
);
int
connectTimeout
=
httpRequest
.
getConnectTimeout
();
if
(
connectTimeout
<
0
)
{
connectTimeout
=
m_configUtil
.
getConnectTimeout
();
...
...
apollo-client/src/test/java/com/ctrip/apollo/integration/ConfigIntegrationTest.java
浏览文件 @
2781e18c
...
...
@@ -9,6 +9,7 @@ import com.ctrip.apollo.ConfigChangeListener;
import
com.ctrip.apollo.ConfigService
;
import
com.ctrip.apollo.core.ConfigConsts
;
import
com.ctrip.apollo.core.dto.ApolloConfig
;
import
com.ctrip.apollo.core.dto.ApolloConfigNotification
;
import
com.ctrip.apollo.core.utils.ClassLoaderUtil
;
import
com.ctrip.apollo.model.ConfigChangeEvent
;
...
...
@@ -196,6 +197,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
config
.
addChangeListener
(
new
ConfigChangeListener
()
{
AtomicInteger
counter
=
new
AtomicInteger
(
0
);
@Override
public
void
onChange
(
ConfigChangeEvent
changeEvent
)
{
//only need to assert once
...
...
@@ -220,6 +222,66 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
assertEquals
(
anotherValue
,
config
.
getProperty
(
someKey
,
null
));
}
@Test
public
void
testLongPollRefresh
()
throws
Exception
{
final
String
someKey
=
"someKey"
;
final
String
someValue
=
"someValue"
;
final
String
anotherValue
=
"anotherValue"
;
Map
<
String
,
String
>
configurations
=
Maps
.
newHashMap
();
configurations
.
put
(
someKey
,
someValue
);
ApolloConfig
apolloConfig
=
assembleApolloConfig
(
configurations
);
ContextHandler
configHandler
=
mockConfigServerHandler
(
HttpServletResponse
.
SC_OK
,
apolloConfig
);
ContextHandler
pollHandler
=
mockPollNotificationHandler
(
50
,
HttpServletResponse
.
SC_OK
,
new
ApolloConfigNotification
(
apolloConfig
.
getAppId
(),
apolloConfig
.
getCluster
(),
apolloConfig
.
getNamespace
()),
false
);
startServerWithHandlers
(
configHandler
,
pollHandler
);
Config
config
=
ConfigService
.
getConfig
();
assertEquals
(
someValue
,
config
.
getProperty
(
someKey
,
null
));
apolloConfig
.
getConfigurations
().
put
(
someKey
,
anotherValue
);
TimeUnit
.
MILLISECONDS
.
sleep
(
60
);
assertEquals
(
anotherValue
,
config
.
getProperty
(
someKey
,
null
));
}
private
ContextHandler
mockPollNotificationHandler
(
final
long
pollResultTimeOutInMS
,
final
int
statusCode
,
final
ApolloConfigNotification
result
,
final
boolean
failedAtFirstTime
)
{
ContextHandler
context
=
new
ContextHandler
(
"/notifications"
);
context
.
setHandler
(
new
AbstractHandler
()
{
AtomicInteger
counter
=
new
AtomicInteger
(
0
);
@Override
public
void
handle
(
String
target
,
Request
baseRequest
,
HttpServletRequest
request
,
HttpServletResponse
response
)
throws
IOException
,
ServletException
{
if
(
failedAtFirstTime
&&
counter
.
incrementAndGet
()
==
1
)
{
response
.
setStatus
(
HttpServletResponse
.
SC_INTERNAL_SERVER_ERROR
);
baseRequest
.
setHandled
(
true
);
return
;
}
try
{
TimeUnit
.
MILLISECONDS
.
sleep
(
pollResultTimeOutInMS
);
}
catch
(
InterruptedException
e
)
{
}
response
.
setContentType
(
"application/json;charset=UTF-8"
);
response
.
setStatus
(
statusCode
);
response
.
getWriter
().
println
(
gson
.
toJson
(
result
));
baseRequest
.
setHandled
(
true
);
}
});
return
context
;
}
private
ContextHandler
mockConfigServerHandler
(
final
int
statusCode
,
final
ApolloConfig
result
,
final
boolean
failedAtFirstTime
)
{
ContextHandler
context
=
new
ContextHandler
(
"/configs/*"
);
...
...
@@ -237,15 +299,11 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
response
.
setContentType
(
"application/json;charset=UTF-8"
);
response
.
setStatus
(
statusCode
);
response
.
getWriter
().
println
(
gson
.
toJson
(
result
));
baseRequest
.
setHandled
(
true
);
}
});
return
context
;
}
...
...
apollo-client/src/test/java/com/ctrip/apollo/internals/RemoteConfigRepositoryTest.java
浏览文件 @
2781e18c
...
...
@@ -5,6 +5,7 @@ import com.google.common.collect.Lists;
import
com.google.common.collect.Maps
;
import
com.ctrip.apollo.core.dto.ApolloConfig
;
import
com.ctrip.apollo.core.dto.ApolloConfigNotification
;
import
com.ctrip.apollo.core.dto.ServiceDTO
;
import
com.ctrip.apollo.util.ConfigUtil
;
import
com.ctrip.apollo.util.http.HttpRequest
;
...
...
@@ -22,6 +23,9 @@ import org.unidal.lookup.ComponentTestCase;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Properties
;
import
java.util.concurrent.TimeUnit
;
import
javax.servlet.http.HttpServletResponse
;
import
static
org
.
junit
.
Assert
.
assertEquals
;
import
static
org
.
mockito
.
Matchers
.
eq
;
...
...
@@ -41,6 +45,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
@Mock
private
static
HttpResponse
<
ApolloConfig
>
someResponse
;
@Mock
private
static
HttpResponse
<
ApolloConfigNotification
>
pollResponse
;
@Mock
private
ConfigUtil
someConfigUtil
;
@Before
...
...
@@ -48,6 +54,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
super
.
setUp
();
someNamespace
=
"someName"
;
when
(
pollResponse
.
getStatusCode
()).
thenReturn
(
HttpServletResponse
.
SC_NOT_MODIFIED
);
defineComponent
(
ConfigUtil
.
class
,
MockConfigUtil
.
class
);
defineComponent
(
ConfigServiceLocator
.
class
,
MockConfigServiceLocator
.
class
);
defineComponent
(
HttpUtil
.
class
,
MockHttpUtil
.
class
);
...
...
@@ -65,9 +73,11 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
when
(
someResponse
.
getBody
()).
thenReturn
(
someApolloConfig
);
RemoteConfigRepository
remoteConfigRepository
=
new
RemoteConfigRepository
(
someNamespace
);
Properties
config
=
remoteConfigRepository
.
getConfig
();
assertEquals
(
configurations
,
config
);
remoteConfigRepository
.
stopLongPollingRefresh
();
}
@Test
(
expected
=
RuntimeException
.
class
)
...
...
@@ -76,6 +86,10 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
when
(
someResponse
.
getStatusCode
()).
thenReturn
(
500
);
RemoteConfigRepository
remoteConfigRepository
=
new
RemoteConfigRepository
(
someNamespace
);
//must stop the long polling before exception occurred
remoteConfigRepository
.
stopLongPollingRefresh
();
remoteConfigRepository
.
getConfig
();
}
...
...
@@ -102,6 +116,35 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
verify
(
someListener
,
times
(
1
)).
onRepositoryChange
(
eq
(
someNamespace
),
captor
.
capture
());
assertEquals
(
newConfigurations
,
captor
.
getValue
());
remoteConfigRepository
.
stopLongPollingRefresh
();
}
@Test
public
void
testLongPollingRefresh
()
throws
Exception
{
Map
<
String
,
String
>
configurations
=
ImmutableMap
.
of
(
"someKey"
,
"someValue"
);
ApolloConfig
someApolloConfig
=
assembleApolloConfig
(
configurations
);
when
(
someResponse
.
getStatusCode
()).
thenReturn
(
200
);
when
(
someResponse
.
getBody
()).
thenReturn
(
someApolloConfig
);
RepositoryChangeListener
someListener
=
mock
(
RepositoryChangeListener
.
class
);
RemoteConfigRepository
remoteConfigRepository
=
new
RemoteConfigRepository
(
someNamespace
);
remoteConfigRepository
.
addChangeListener
(
someListener
);
final
ArgumentCaptor
<
Properties
>
captor
=
ArgumentCaptor
.
forClass
(
Properties
.
class
);
Map
<
String
,
String
>
newConfigurations
=
ImmutableMap
.
of
(
"someKey"
,
"anotherValue"
);
ApolloConfig
newApolloConfig
=
assembleApolloConfig
(
newConfigurations
);
when
(
pollResponse
.
getStatusCode
()).
thenReturn
(
HttpServletResponse
.
SC_OK
);
when
(
someResponse
.
getBody
()).
thenReturn
(
newApolloConfig
);
TimeUnit
.
MILLISECONDS
.
sleep
(
60
);
remoteConfigRepository
.
stopLongPollingRefresh
();
verify
(
someListener
,
times
(
1
)).
onRepositoryChange
(
eq
(
someNamespace
),
captor
.
capture
());
assertEquals
(
newConfigurations
,
captor
.
getValue
());
}
private
ApolloConfig
assembleApolloConfig
(
Map
<
String
,
String
>
configurations
)
{
...
...
@@ -109,7 +152,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
String
someClusterName
=
"cluster"
;
String
someReleaseId
=
"1"
;
ApolloConfig
apolloConfig
=
new
ApolloConfig
(
someAppId
,
someClusterName
,
someNamespace
,
someReleaseId
);
new
ApolloConfig
(
someAppId
,
someClusterName
,
someNamespace
,
someReleaseId
);
apolloConfig
.
setConfigurations
(
configurations
);
...
...
@@ -143,6 +186,13 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
public
static
class
MockHttpUtil
extends
HttpUtil
{
@Override
public
<
T
>
HttpResponse
<
T
>
doGet
(
HttpRequest
httpRequest
,
Class
<
T
>
responseType
)
{
if
(
httpRequest
.
getUrl
().
contains
(
"notifications?"
))
{
try
{
TimeUnit
.
MILLISECONDS
.
sleep
(
50
);
}
catch
(
InterruptedException
e
)
{
}
return
(
HttpResponse
<
T
>)
pollResponse
;
}
return
(
HttpResponse
<
T
>)
someResponse
;
}
}
...
...
apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/ConfigController.java
浏览文件 @
2781e18c
...
...
@@ -40,7 +40,7 @@ public class ConfigController {
@RequestParam
(
value
=
"releaseId"
,
defaultValue
=
"-1"
)
String
clientSideReleaseId
,
HttpServletResponse
response
)
throws
IOException
{
Release
release
=
configService
.
findRelease
(
appId
,
clusterName
,
namespace
);
//TODO if namespace != application, should also query config by namespace and DC?
//TODO if namespace != application, should also query config by namespace and DC
(default if DC not found)
?
//And if found, should merge config, as well as releaseId -> make releaseId a string?
if
(
release
==
null
)
{
response
.
sendError
(
HttpServletResponse
.
SC_NOT_FOUND
,
...
...
apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/NotificationController.java
0 → 100644
浏览文件 @
2781e18c
package
com.ctrip.apollo.configservice.controller
;
import
com.google.common.collect.HashMultimap
;
import
com.google.common.collect.Multimap
;
import
com.google.common.collect.Multimaps
;
import
com.ctrip.apollo.core.ConfigConsts
;
import
com.ctrip.apollo.core.dto.ApolloConfigNotification
;
import
com.ctrip.apollo.core.utils.ApolloThreadFactory
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RequestMethod
;
import
org.springframework.web.bind.annotation.RequestParam
;
import
org.springframework.web.bind.annotation.RestController
;
import
org.springframework.web.context.request.async.DeferredResult
;
import
java.util.Random
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
import
javax.servlet.http.HttpServletResponse
;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RestController
@RequestMapping
(
"/notifications"
)
public
class
NotificationController
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
NotificationController
.
class
);
private
final
static
long
TIMEOUT
=
60
*
60
*
1000
;
//60 MINUTES
private
final
Multimap
<
String
,
DeferredResult
<
ApolloConfigNotification
>>
deferredResults
=
Multimaps
.
synchronizedSetMultimap
(
HashMultimap
.
create
());
{
startRandomChange
();
}
@RequestMapping
(
method
=
RequestMethod
.
GET
)
public
DeferredResult
<
ApolloConfigNotification
>
pollNotification
(
@RequestParam
(
value
=
"appId"
)
String
appId
,
@RequestParam
(
value
=
"cluster"
)
String
cluster
,
@RequestParam
(
value
=
"namespace"
,
defaultValue
=
ConfigConsts
.
NAMESPACE_APPLICATION
)
String
namespace
,
@RequestParam
(
value
=
"releaseId"
,
defaultValue
=
"-1"
)
String
clientSideReleaseId
,
HttpServletResponse
response
)
{
DeferredResult
<
ApolloConfigNotification
>
deferredResult
=
new
DeferredResult
<>(
TIMEOUT
);
String
key
=
assembleKey
(
appId
,
cluster
,
namespace
);
this
.
deferredResults
.
put
(
key
,
deferredResult
);
deferredResult
.
onCompletion
(()
->
{
logger
.
info
(
"deferred result for {} {} {} completed"
,
appId
,
cluster
,
namespace
);
deferredResults
.
remove
(
key
,
deferredResult
);
});
deferredResult
.
onTimeout
(()
->
{
logger
.
info
(
"deferred result for {} {} {} timeout"
,
appId
,
cluster
,
namespace
);
response
.
setStatus
(
HttpServletResponse
.
SC_NOT_MODIFIED
);
});
logger
.
info
(
"deferred result for {} {} {} returned"
,
appId
,
cluster
,
namespace
);
return
deferredResult
;
}
private
void
startRandomChange
()
{
Random
random
=
new
Random
();
ScheduledExecutorService
testService
=
Executors
.
newScheduledThreadPool
(
1
,
ApolloThreadFactory
.
create
(
"NotificationController"
,
true
));
testService
.
scheduleAtFixedRate
((
Runnable
)
()
->
deferredResults
.
entries
().
stream
().
filter
(
entry
->
random
.
nextBoolean
()).
forEach
(
entry
->
{
String
[]
keys
=
entry
.
getKey
().
split
(
"-"
);
entry
.
getValue
().
setResult
(
new
ApolloConfigNotification
(
keys
[
0
],
keys
[
1
],
keys
[
2
]));
}),
30
,
30
,
TimeUnit
.
SECONDS
);
}
private
String
assembleKey
(
String
appId
,
String
cluster
,
String
namespace
)
{
return
String
.
format
(
"%s-%s-%s"
,
appId
,
cluster
,
namespace
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录