Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦中观雨
cat
提交
a67d6211
C
cat
项目概览
梦中观雨
/
cat
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
cat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a67d6211
编写于
7月 29, 2014
作者:
Y
youyong205
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor channel code
上级
18c59ecb
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
137 addition
and
61 deletion
+137
-61
cat-client/src/main/java/com/dianping/cat/configuration/ClientConfigManager.java
...a/com/dianping/cat/configuration/ClientConfigManager.java
+1
-1
cat-client/src/main/java/com/dianping/cat/message/io/ChannelManager.java
...main/java/com/dianping/cat/message/io/ChannelManager.java
+27
-28
cat-core/src/main/java/com/dianping/cat/DomainManager.java
cat-core/src/main/java/com/dianping/cat/DomainManager.java
+1
-1
cat-home/src/main/java/com/dianping/cat/report/service/impl/RouterConfigService.java
...dianping/cat/report/service/impl/RouterConfigService.java
+1
-5
cat-home/src/main/java/com/dianping/cat/report/task/alert/sender/sender/SmsSender.java
...anping/cat/report/task/alert/sender/sender/SmsSender.java
+1
-1
cat-home/src/main/java/com/dianping/cat/report/task/router/RouterConfigBuilder.java
.../dianping/cat/report/task/router/RouterConfigBuilder.java
+79
-16
cat-home/src/test/java/com/dianping/cat/report/analyzer/RouterBuilderTest.java
...a/com/dianping/cat/report/analyzer/RouterBuilderTest.java
+27
-9
未找到文件。
cat-client/src/main/java/com/dianping/cat/configuration/ClientConfigManager.java
浏览文件 @
a67d6211
...
...
@@ -159,7 +159,7 @@ public class ClientConfigManager implements LogEnabled {
for
(
Server
server
:
servers
)
{
Integer
httpPort
=
server
.
getHttpPort
();
if
(
httpPort
==
null
)
{
if
(
httpPort
==
null
||
httpPort
==
0
)
{
httpPort
=
8080
;
}
return
String
.
format
(
"http://%s:%d/cat/s/router?domain=%s"
,
server
.
getIp
(),
httpPort
,
getDomain
().
getId
());
...
...
cat-client/src/main/java/com/dianping/cat/message/io/ChannelManager.java
浏览文件 @
a67d6211
...
...
@@ -59,7 +59,7 @@ public class ChannelManager implements Task {
private
MessageQueue
m_queue
;
private
String
m_
lastServers
;
private
String
m_
activeServerConfig
;
public
ChannelManager
(
Logger
logger
,
List
<
InetSocketAddress
>
serverAddresses
,
MessageQueue
queue
,
ClientConfigManager
configManager
)
{
...
...
@@ -84,16 +84,14 @@ public class ChannelManager implements Task {
m_bootstrap
=
bootstrap
;
String
serverConfig
=
get
ServerConfig
();
String
serverConfig
=
load
ServerConfig
();
if
(
serverConfig
!=
null
)
{
List
<
InetSocketAddress
>
newAddress
=
parse
(
serverConfig
);
List
<
InetSocketAddress
>
newAddress
=
parse
SocketAddress
(
serverConfig
);
initChannel
(
newAddress
);
m_lastServers
=
serverConfig
;
initChannel
(
newAddress
,
serverConfig
);
}
else
{
initChannel
(
serverAddresses
);
m_lastServers
=
null
;
initChannel
(
serverAddresses
,
null
);
}
}
...
...
@@ -148,20 +146,7 @@ public class ChannelManager implements Task {
return
"TcpSocketSender-ChannelManager"
;
}
private
String
getServerConfig
()
{
try
{
String
url
=
m_configManager
.
getServerConfigUrl
();
InputStream
currentServer
=
Urls
.
forIO
().
readTimeout
(
3000
).
connectTimeout
(
1000
).
openStream
(
url
);
String
content
=
Files
.
forIO
().
readFrom
(
currentServer
,
"utf-8"
);
return
content
.
trim
();
}
catch
(
Exception
e
)
{
}
return
null
;
}
private
void
initChannel
(
List
<
InetSocketAddress
>
addresses
)
{
private
void
initChannel
(
List
<
InetSocketAddress
>
addresses
,
String
serverConfig
)
{
try
{
StringBuilder
sb
=
new
StringBuilder
();
...
...
@@ -179,11 +164,12 @@ public class ChannelManager implements Task {
if
(
future
!=
null
)
{
m_activeFuture
=
future
;
m_activeIndex
=
i
;
m_activeServerConfig
=
serverConfig
;
break
;
}
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
(
);
m_logger
.
error
(
e
.
getMessage
(),
e
);
// ignore
}
}
...
...
@@ -208,7 +194,20 @@ public class ChannelManager implements Task {
}
}
private
List
<
InetSocketAddress
>
parse
(
String
content
)
{
private
String
loadServerConfig
()
{
try
{
String
url
=
m_configManager
.
getServerConfigUrl
();
InputStream
currentServer
=
Urls
.
forIO
().
readTimeout
(
2000
).
connectTimeout
(
1000
).
openStream
(
url
);
String
content
=
Files
.
forIO
().
readFrom
(
currentServer
,
"utf-8"
);
return
content
.
trim
();
}
catch
(
Exception
e
)
{
m_logger
.
error
(
e
.
getMessage
(),
e
);
}
return
null
;
}
private
List
<
InetSocketAddress
>
parseSocketAddress
(
String
content
)
{
try
{
List
<
String
>
strs
=
Splitters
.
by
(
";"
).
noEmptyItem
().
split
(
content
);
List
<
InetSocketAddress
>
address
=
new
ArrayList
<
InetSocketAddress
>();
...
...
@@ -229,6 +228,7 @@ public class ChannelManager implements Task {
public
void
run
()
{
while
(
m_active
)
{
m_count
++;
if
(
shouldCheckServerConfig
(
m_count
))
{
Pair
<
Boolean
,
String
>
pair
=
serverConfigChanged
();
...
...
@@ -236,10 +236,9 @@ public class ChannelManager implements Task {
closeAllChannel
();
String
servers
=
pair
.
getValue
();
List
<
InetSocketAddress
>
serverAddresses
=
parse
(
servers
);
List
<
InetSocketAddress
>
serverAddresses
=
parse
SocketAddress
(
servers
);
initChannel
(
serverAddresses
);
m_lastServers
=
servers
;
initChannel
(
serverAddresses
,
servers
);
}
}
...
...
@@ -288,9 +287,9 @@ public class ChannelManager implements Task {
}
private
Pair
<
Boolean
,
String
>
serverConfigChanged
()
{
String
current
=
get
ServerConfig
();
String
current
=
load
ServerConfig
();
if
(
current
!=
null
&&
!
current
.
equals
(
m_
lastServers
))
{
if
(
current
!=
null
&&
!
current
.
equals
(
m_
activeServerConfig
))
{
return
new
Pair
<
Boolean
,
String
>(
true
,
current
);
}
else
{
return
new
Pair
<
Boolean
,
String
>(
false
,
current
);
...
...
cat-core/src/main/java/com/dianping/cat/DomainManager.java
浏览文件 @
a67d6211
...
...
@@ -69,7 +69,7 @@ public class DomainManager implements Initializable, LogEnabled {
public
void
enableLogging
(
Logger
logger
)
{
m_logger
=
logger
;
}
@Override
public
void
initialize
()
throws
InitializationException
{
if
(!
m_manager
.
isLocalMode
())
{
...
...
cat-home/src/main/java/com/dianping/cat/report/service/impl/RouterConfigService.java
浏览文件 @
a67d6211
...
...
@@ -23,11 +23,7 @@ public class RouterConfigService extends AbstractReportService<RouterConfig> {
@Override
public
RouterConfig
makeReport
(
String
domain
,
Date
start
,
Date
end
)
{
RouterConfig
report
=
new
RouterConfig
(
domain
);
report
.
setStartTime
(
start
);
report
.
setEndTime
(
end
);
return
report
;
return
null
;
}
@Override
...
...
cat-home/src/main/java/com/dianping/cat/report/task/alert/sender/sender/SmsSender.java
浏览文件 @
a67d6211
...
...
@@ -19,7 +19,7 @@ import com.dianping.cat.report.task.alert.sender.AlertMessageEntity;
public
class
SmsSender
implements
Sender
,
LogEnabled
{
public
static
final
String
ID
=
AlertConstants
.
SMS
;
private
Logger
m_logger
;
@Override
...
...
cat-home/src/main/java/com/dianping/cat/report/task/router/RouterConfigBuilder.java
浏览文件 @
a67d6211
...
...
@@ -37,31 +37,94 @@ public class RouterConfigBuilder implements ReportTaskBuilder {
@Inject
private
RouterConfigManager
m_configManager
;
private
boolean
needRebuild
(
StateReport
report
,
RouterConfig
config
)
{
if
(
config
!=
null
)
{
Map
<
String
,
Long
>
serverProcesses
=
new
LinkedHashMap
<
String
,
Long
>();
StateReportVisitor
visitor
=
new
StateReportVisitor
();
visitor
.
visitStateReport
(
report
);
Map
<
String
,
Long
>
numbers
=
visitor
.
getNumbers
();
for
(
Entry
<
String
,
Long
>
entry
:
numbers
.
entrySet
())
{
String
domain
=
entry
.
getKey
();
Long
count
=
entry
.
getValue
();
Domain
serverConfig
=
config
.
findDomain
(
domain
);
if
(
serverConfig
!=
null
)
{
Server
server
=
serverConfig
.
getServers
().
get
(
0
);
String
serverId
=
server
.
getId
();
Long
value
=
serverProcesses
.
get
(
serverId
);
if
(
value
==
null
)
{
serverProcesses
.
put
(
serverId
,
count
);
}
else
{
serverProcesses
.
put
(
serverId
,
count
+
value
);
}
}
}
long
min
=
Integer
.
MAX_VALUE
;
long
max
=
Integer
.
MIN_VALUE
;
for
(
Entry
<
String
,
Long
>
entry
:
serverProcesses
.
entrySet
())
{
long
value
=
entry
.
getValue
();
if
(
value
>
max
)
{
max
=
value
;
}
if
(
value
<
min
)
{
min
=
value
;
}
}
if
(
max
*
1.0
/
min
>
1.4
)
{
return
true
;
}
else
{
return
false
;
}
}
else
{
return
true
;
}
}
@Override
public
boolean
buildDailyTask
(
String
name
,
String
domain
,
Date
period
)
{
Date
yesterday
=
new
Date
(
period
.
getTime
()
-
TimeUtil
.
ONE_DAY
);
RouterConfig
yesterdayConfig
=
m_reportService
.
queryRouterConfigReport
(
Constants
.
CAT
,
yesterday
,
period
);
Date
start
=
period
;
Date
end
=
new
Date
(
start
.
getTime
()
+
TimeUtil
.
ONE_DAY
);
StateReport
report
=
m_reportService
.
queryStateReport
(
Constants
.
CAT
,
start
,
end
);
StateReportVisitor
visitor
=
new
StateReportVisitor
();
RouterConfig
routerConfig
=
new
RouterConfig
(
Constants
.
CAT
);
routerConfig
.
setStartTime
(
period
);
routerConfig
.
setEndTime
(
new
Date
(
period
.
getTime
()
+
TimeUtil
.
ONE_DAY
));
visitor
.
visitStateReport
(
report
);
boolean
need
=
needRebuild
(
report
,
yesterdayConfig
);
RouterConfig
routerConfig
;
Map
<
String
,
Long
>
numbers
=
visitor
.
getNumbers
();
Comparator
<
Entry
<
String
,
Long
>>
compator
=
new
Comparator
<
Map
.
Entry
<
String
,
Long
>>()
{
if
(
need
)
{
routerConfig
=
new
RouterConfig
(
Constants
.
CAT
);
StateReportVisitor
visitor
=
new
StateReportVisitor
();
@Override
public
int
compare
(
Entry
<
String
,
Long
>
o1
,
Entry
<
String
,
Long
>
o2
)
{
return
(
int
)
(
o2
.
getValue
()
-
o1
.
getValue
());
}
};
numbers
=
MapUtils
.
sortMap
(
numbers
,
compator
);
Map
<
Server
,
Long
>
servers
=
findAvaliableServers
();
routerConfig
.
setStartTime
(
period
);
routerConfig
.
setEndTime
(
new
Date
(
period
.
getTime
()
+
TimeUtil
.
ONE_DAY
));
visitor
.
visitStateReport
(
report
);
Map
<
String
,
Long
>
numbers
=
visitor
.
getNumbers
();
Comparator
<
Entry
<
String
,
Long
>>
compator
=
new
Comparator
<
Map
.
Entry
<
String
,
Long
>>()
{
processMainServer
(
servers
,
routerConfig
,
numbers
);
processBackServer
(
servers
,
routerConfig
,
numbers
);
@Override
public
int
compare
(
Entry
<
String
,
Long
>
o1
,
Entry
<
String
,
Long
>
o2
)
{
return
(
int
)
(
o2
.
getValue
()
-
o1
.
getValue
());
}
};
numbers
=
MapUtils
.
sortMap
(
numbers
,
compator
);
Map
<
Server
,
Long
>
servers
=
findAvaliableServers
();
processMainServer
(
servers
,
routerConfig
,
numbers
);
processBackServer
(
servers
,
routerConfig
,
numbers
);
}
else
{
System
.
err
.
println
(
"no need change"
);
routerConfig
=
yesterdayConfig
;
routerConfig
.
setStartTime
(
start
);
routerConfig
.
setEndTime
(
end
);
}
DailyReport
dailyReport
=
new
DailyReport
();
...
...
cat-home/src/test/java/com/dianping/cat/report/analyzer/RouterBuilderTest.java
浏览文件 @
a67d6211
...
...
@@ -3,34 +3,52 @@ package com.dianping.cat.report.analyzer;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
junit.framework.Assert
;
import
org.junit.Test
;
import
org.unidal.lookup.ComponentTestCase
;
import
com.dianping.cat.Constants
;
import
com.dianping.cat.core.dal.Task
;
import
com.dianping.cat.helper.TimeUtil
;
import
com.dianping.cat.home.router.entity.RouterConfig
;
import
com.dianping.cat.report.service.ReportServiceManager
;
import
com.dianping.cat.report.task.spi.ReportFacade
;
public
class
RouterBuilderTest
extends
ComponentTestCase
{
public
String
day1
=
"2014-07-25"
;
public
String
day2
=
"2014-07-26"
;
public
String
day3
=
"2014-07-27"
;
public
String
day4
=
"2014-07-28"
;
@Test
public
void
test
()
throws
Exception
{
ReportFacade
reportFacade
=
(
ReportFacade
)
lookup
(
ReportFacade
.
class
);
Task
task
=
new
Task
();
Date
reportPeriod
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
).
parse
(
"2014-07-28"
);
Date
reportPeriod
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
).
parse
(
day3
);
task
.
setReportName
(
Constants
.
REPORT_ROUTER
);
task
.
setReportPeriod
(
reportPeriod
);
task
.
setReportDomain
(
Constants
.
CAT
);
task
.
setTaskType
(
1
);
reportFacade
.
builderReport
(
task
);
ReportServiceManager
manager
=
(
ReportServiceManager
)
lookup
(
ReportServiceManager
.
class
);
RouterConfig
report
=
manager
.
queryRouterConfigReport
(
Constants
.
CAT
,
reportPeriod
,
new
Date
(
reportPeriod
.
getTime
()+
TimeUtil
.
ONE_DAY
));
System
.
err
.
println
(
report
);
reportFacade
.
builderReport
(
task
);
task
.
setReportPeriod
(
new
SimpleDateFormat
(
"yyyy-MM-dd"
).
parse
(
day4
));
reportFacade
.
builderReport
(
task
);
}
@Test
public
void
test1
()
throws
Exception
{
ReportServiceManager
service
=
lookup
(
ReportServiceManager
.
class
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
RouterConfig
report1
=
service
.
queryRouterConfigReport
(
Constants
.
CAT
,
sdf
.
parse
(
day3
),
sdf
.
parse
(
day4
));
RouterConfig
report2
=
service
.
queryRouterConfigReport
(
Constants
.
CAT
,
sdf
.
parse
(
day3
),
sdf
.
parse
(
day4
));
Assert
.
assertEquals
(
report1
.
toString
(),
report2
.
toString
());
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录