Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦中观雨
cat
提交
23856a15
C
cat
项目概览
梦中观雨
/
cat
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
cat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
23856a15
编写于
4月 01, 2012
作者:
F
Frankie Wu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refine BucketManager
上级
f76b04bd
变更
17
显示空白变更内容
内联
并排
Showing
17 changed file
with
80 addition
and
213 deletion
+80
-213
cat-consumer/src/main/java/com/dianping/cat/consumer/DefaultAnalyzerFactory.java
...ava/com/dianping/cat/consumer/DefaultAnalyzerFactory.java
+0
-18
cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java
...m/dianping/cat/consumer/build/ComponentsConfigurator.java
+1
-3
cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java
...n/java/com/dianping/cat/consumer/event/EventAnalyzer.java
+20
-58
cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java
...va/com/dianping/cat/consumer/problem/ProblemAnalyzer.java
+14
-49
cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java
...ianping/cat/consumer/transaction/TransactionAnalyzer.java
+14
-52
cat-consumer/src/main/resources/META-INF/plexus/components.xml
...onsumer/src/main/resources/META-INF/plexus/components.xml
+0
-3
cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java
...src/main/java/com/dianping/cat/storage/BucketManager.java
+4
-2
cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java
...m/dianping/cat/storage/internal/DefaultBucketManager.java
+9
-4
cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java
.../com/dianping/cat/storage/message/LocalLogviewBucket.java
+3
-9
cat-core/src/main/java/com/dianping/cat/storage/report/LocalReportBucket.java
...va/com/dianping/cat/storage/report/LocalReportBucket.java
+3
-3
cat-core/src/test/java/com/dianping/cat/storage/BucketConcurrentTest.java
...t/java/com/dianping/cat/storage/BucketConcurrentTest.java
+3
-3
cat-core/src/test/java/com/dianping/cat/storage/BucketManagerTest.java
...test/java/com/dianping/cat/storage/BucketManagerTest.java
+4
-4
cat-home/src/main/java/com/dianping/cat/report/page/model/event/HistoricalEventService.java
...g/cat/report/page/model/event/HistoricalEventService.java
+1
-1
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HistoricalLogViewService.java
...t/report/page/model/logview/HistoricalLogViewService.java
+1
-1
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java
...ng/cat/report/page/model/logview/LocalLogViewService.java
+1
-1
cat-home/src/main/java/com/dianping/cat/report/page/model/problem/HistoricalProblemService.java
...t/report/page/model/problem/HistoricalProblemService.java
+1
-1
cat-home/src/main/java/com/dianping/cat/report/page/model/transaction/HistoricalTransactionService.java
.../page/model/transaction/HistoricalTransactionService.java
+1
-1
未找到文件。
cat-consumer/src/main/java/com/dianping/cat/consumer/DefaultAnalyzerFactory.java
浏览文件 @
23856a15
...
...
@@ -6,35 +6,28 @@ import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import
com.dianping.cat.consumer.transaction.TransactionAnalyzer
;
import
com.dianping.cat.message.spi.MessageAnalyzer
;
import
com.site.lookup.ContainerHolder
;
import
com.site.lookup.annotation.Inject
;
/**
* @author yong.you
* @since Jan 5, 2012
*/
public
class
DefaultAnalyzerFactory
extends
ContainerHolder
implements
AnalyzerFactory
{
@Inject
private
boolean
m_local
;
@Override
public
MessageAnalyzer
create
(
String
name
,
long
start
,
long
duration
,
long
extraTime
)
{
if
(
name
.
equals
(
"problem"
))
{
ProblemAnalyzer
analyzer
=
lookup
(
ProblemAnalyzer
.
class
);
analyzer
.
setAnalyzerInfo
(
start
,
duration
,
extraTime
);
analyzer
.
setLocal
(
m_local
);
return
analyzer
;
}
else
if
(
name
.
equals
(
"transaction"
))
{
TransactionAnalyzer
analyzer
=
lookup
(
TransactionAnalyzer
.
class
);
analyzer
.
setAnalyzerInfo
(
start
,
duration
,
extraTime
);
analyzer
.
setLocal
(
m_local
);
return
analyzer
;
}
else
if
(
name
.
equals
(
"event"
))
{
EventAnalyzer
analyzer
=
lookup
(
EventAnalyzer
.
class
);
analyzer
.
setAnalyzerInfo
(
start
,
duration
,
extraTime
);
analyzer
.
setLocal
(
m_local
);
return
analyzer
;
}
else
if
(
name
.
equals
(
"ip"
))
{
IpAnalyzer
analyzer
=
lookup
(
IpAnalyzer
.
class
);
...
...
@@ -49,15 +42,4 @@ public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerF
public
void
release
(
Object
component
)
{
super
.
release
(
component
);
}
/**
* Set local mode. In local mode, all reports and log-views will only be
* stored in local disk, no reports or log-views will be stored in HDFS or
* MySQL. <p>
*
* @param local
*/
public
void
setLocal
(
boolean
local
)
{
m_local
=
local
;
}
}
cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java
浏览文件 @
23856a15
...
...
@@ -26,10 +26,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public
List
<
Component
>
defineComponents
()
{
List
<
Component
>
all
=
new
ArrayList
<
Component
>();
String
defaultLocalMode
=
"true"
;
all
.
add
(
C
(
AnalyzerFactory
.
class
,
DefaultAnalyzerFactory
.
class
)
//
.
config
(
E
(
"local"
).
value
(
property
(
"local"
,
defaultLocalMode
))));
all
.
add
(
C
(
AnalyzerFactory
.
class
,
DefaultAnalyzerFactory
.
class
));
all
.
add
(
C
(
MessageConsumer
.
class
,
"realtime"
,
RealtimeConsumer
.
class
)
//
.
req
(
AnalyzerFactory
.
class
).
config
(
E
(
"consumerId"
).
value
(
"realtime"
)
//
...
...
cat-consumer/src/main/java/com/dianping/cat/consumer/event/EventAnalyzer.java
浏览文件 @
23856a15
...
...
@@ -48,30 +48,19 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
private
long
m_duration
;
private
boolean
m_local
;
void
closeMessageBuckets
()
{
Date
timestamp
=
new
Date
(
m_startTime
);
for
(
String
domain
:
m_reports
.
keySet
())
{
Bucket
<
MessageTree
>
localBucket
=
null
;
Bucket
<
MessageTree
>
remoteBucket
=
null
;
Bucket
<
MessageTree
>
logviewBucket
=
null
;
try
{
localBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"local"
);
if
(!
m_local
)
{
remoteBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"remote"
);
}
logviewBucket
=
m_bucketManager
.
getLogviewBucket
(
new
Date
(
m_startTime
),
domain
);
}
catch
(
Exception
e
)
{
m_logger
.
error
(
String
.
format
(
"Error when getting
message
bucket of %s!"
,
timestamp
),
e
);
m_logger
.
error
(
String
.
format
(
"Error when getting
logview
bucket of %s!"
,
timestamp
),
e
);
}
finally
{
if
(
localBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
localBucket
);
}
if
(
remoteBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
remoteBucket
);
if
(
logviewBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
logviewBucket
);
}
}
}
...
...
@@ -128,22 +117,22 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
void
loadReports
()
{
Date
timestamp
=
new
Date
(
m_startTime
);
DefaultXmlParser
parser
=
new
DefaultXmlParser
();
Bucket
<
String
>
b
ucket
=
null
;
Bucket
<
String
>
reportB
ucket
=
null
;
try
{
bucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"event"
,
"local
"
);
reportBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"event
"
);
for
(
String
id
:
b
ucket
.
getIdsByPrefix
(
""
))
{
String
xml
=
b
ucket
.
findById
(
id
);
for
(
String
id
:
reportB
ucket
.
getIdsByPrefix
(
""
))
{
String
xml
=
reportB
ucket
.
findById
(
id
);
EventReport
report
=
parser
.
parse
(
xml
);
m_reports
.
put
(
report
.
getDomain
(),
report
);
}
}
catch
(
Exception
e
)
{
m_logger
.
error
(
String
.
format
(
"Error when loading
transacion
reports of %s!"
,
timestamp
),
e
);
m_logger
.
error
(
String
.
format
(
"Error when loading
event
reports of %s!"
,
timestamp
),
e
);
}
finally
{
if
(
b
ucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
b
ucket
);
if
(
reportB
ucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
reportB
ucket
);
}
}
}
...
...
@@ -254,10 +243,6 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
loadReports
();
}
public
void
setLocal
(
boolean
local
)
{
m_local
=
local
;
}
@Override
protected
void
store
(
List
<
EventReport
>
reports
)
{
if
(
reports
==
null
||
reports
.
size
()
==
0
)
{
...
...
@@ -273,47 +258,28 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
String
domain
=
tree
.
getDomain
();
try
{
Bucket
<
MessageTree
>
localBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"local"
);
localBucket
.
storeById
(
messageId
,
tree
);
if
(!
m_local
)
{
Bucket
<
MessageTree
>
remoteBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"remote"
);
Bucket
<
MessageTree
>
logviewBucket
=
m_bucketManager
.
getLogviewBucket
(
new
Date
(
m_startTime
),
domain
);
remoteBucket
.
storeById
(
messageId
,
tree
);
}
logviewBucket
.
storeById
(
messageId
,
tree
);
}
catch
(
IOException
e
)
{
m_logger
.
error
(
"Error when storing
message
for event analyzer!"
,
e
);
m_logger
.
error
(
"Error when storing
logview
for event analyzer!"
,
e
);
}
}
void
storeReports
(
Collection
<
EventReport
>
reports
)
{
Date
timestamp
=
new
Date
(
m_startTime
);
DefaultXmlBuilder
builder
=
new
DefaultXmlBuilder
(
true
);
Bucket
<
String
>
localBucket
=
null
;
Bucket
<
String
>
remoteBucket
=
null
;
Bucket
<
String
>
reportBucket
=
null
;
Transaction
t
=
Cat
.
getProducer
().
newTransaction
(
"Checkpoint"
,
getClass
().
getSimpleName
());
try
{
localBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"event"
,
"local"
);
if
(!
m_local
)
{
remoteBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"event"
,
"remote"
);
}
// delete old one, not append mode
localBucket
.
deleteAndCreate
();
reportBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"event"
);
for
(
EventReport
report
:
reports
)
{
String
xml
=
builder
.
buildXml
(
report
);
String
domain
=
report
.
getDomain
();
localBucket
.
storeById
(
domain
,
xml
);
if
(!
m_local
)
{
remoteBucket
.
storeById
(
domain
,
xml
);
}
reportBucket
.
storeById
(
domain
,
xml
);
}
t
.
setStatus
(
Message
.
SUCCESS
);
...
...
@@ -324,12 +290,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
finally
{
t
.
complete
();
if
(
localBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
localBucket
);
}
if
(
remoteBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
remoteBucket
);
if
(
reportBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
reportBucket
);
}
}
}
...
...
cat-consumer/src/main/java/com/dianping/cat/consumer/problem/ProblemAnalyzer.java
浏览文件 @
23856a15
...
...
@@ -48,30 +48,19 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
private
long
m_duration
;
private
boolean
m_local
;
void
closeMessageBuckets
()
{
Date
timestamp
=
new
Date
(
m_startTime
);
for
(
String
domain
:
m_reports
.
keySet
())
{
Bucket
<
MessageTree
>
localBucket
=
null
;
Bucket
<
MessageTree
>
remoteBucket
=
null
;
Bucket
<
MessageTree
>
logviewBucket
=
null
;
try
{
localBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"local"
);
if
(!
m_local
)
{
remoteBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"remote"
);
}
logviewBucket
=
m_bucketManager
.
getLogviewBucket
(
new
Date
(
m_startTime
),
domain
);
}
catch
(
Exception
e
)
{
m_logger
.
error
(
String
.
format
(
"Error when getting
message
bucket of %s!"
,
timestamp
),
e
);
m_logger
.
error
(
String
.
format
(
"Error when getting
logview
bucket of %s!"
,
timestamp
),
e
);
}
finally
{
if
(
localBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
localBucket
);
}
if
(
remoteBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
remoteBucket
);
if
(
logviewBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
logviewBucket
);
}
}
}
...
...
@@ -145,7 +134,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
Bucket
<
String
>
bucket
=
null
;
try
{
bucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"problem"
,
"local"
);
bucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"problem"
);
for
(
String
id
:
bucket
.
getIdsByPrefix
(
""
))
{
String
xml
=
bucket
.
findById
(
id
);
...
...
@@ -195,10 +184,6 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
loadReports
();
}
public
void
setLocal
(
boolean
local
)
{
m_local
=
local
;
}
@Override
protected
void
store
(
List
<
ProblemReport
>
reports
)
{
if
(
reports
==
null
||
reports
.
size
()
==
0
)
{
...
...
@@ -214,16 +199,9 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
String
domain
=
tree
.
getDomain
();
try
{
Bucket
<
MessageTree
>
localBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"local"
);
localBucket
.
storeById
(
messageId
,
tree
);
Bucket
<
MessageTree
>
logviewBucket
=
m_bucketManager
.
getLogviewBucket
(
new
Date
(
m_startTime
),
domain
);
if
(!
m_local
)
{
Bucket
<
MessageTree
>
remoteBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"remote"
);
remoteBucket
.
storeById
(
messageId
,
tree
);
}
logviewBucket
.
storeById
(
messageId
,
tree
);
}
catch
(
Exception
e
)
{
m_logger
.
error
(
"Error when storing message for problem analyzer!"
,
e
);
}
...
...
@@ -232,29 +210,20 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
void
storeReports
(
Collection
<
ProblemReport
>
reports
)
{
Date
timestamp
=
new
Date
(
m_startTime
);
DefaultXmlBuilder
builder
=
new
DefaultXmlBuilder
(
true
);
Bucket
<
String
>
localBucket
=
null
;
Bucket
<
String
>
remoteBucket
=
null
;
Bucket
<
String
>
reportBucket
=
null
;
Transaction
t
=
Cat
.
getProducer
().
newTransaction
(
"Checkpoint"
,
getClass
().
getSimpleName
());
try
{
localBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"problem"
,
"local"
);
if
(!
m_local
)
{
remoteBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"problem"
,
"remote"
);
}
reportBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"problem"
);
// delete old one, not append mode
local
Bucket
.
deleteAndCreate
();
report
Bucket
.
deleteAndCreate
();
for
(
ProblemReport
report
:
reports
)
{
String
xml
=
builder
.
buildXml
(
report
);
String
domain
=
report
.
getDomain
();
localBucket
.
storeById
(
domain
,
xml
);
if
(!
m_local
)
{
remoteBucket
.
storeById
(
domain
,
xml
);
}
reportBucket
.
storeById
(
domain
,
xml
);
}
t
.
setStatus
(
Message
.
SUCCESS
);
...
...
@@ -265,12 +234,8 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
finally
{
t
.
complete
();
if
(
localBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
localBucket
);
}
if
(
remoteBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
remoteBucket
);
if
(
reportBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
reportBucket
);
}
}
}
...
...
cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java
浏览文件 @
23856a15
...
...
@@ -48,30 +48,19 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
private
long
m_duration
;
private
boolean
m_local
;
void
closeMessageBuckets
()
{
Date
timestamp
=
new
Date
(
m_startTime
);
for
(
String
domain
:
m_reports
.
keySet
())
{
Bucket
<
MessageTree
>
localBucket
=
null
;
Bucket
<
MessageTree
>
remoteBucket
=
null
;
Bucket
<
MessageTree
>
logviewBucket
=
null
;
try
{
localBucket
=
m_bucketManager
.
getMessageBucket
(
timestamp
,
domain
,
"local"
);
if
(!
m_local
)
{
remoteBucket
=
m_bucketManager
.
getMessageBucket
(
timestamp
,
domain
,
"remote"
);
}
logviewBucket
=
m_bucketManager
.
getLogviewBucket
(
timestamp
,
domain
);
}
catch
(
Exception
e
)
{
m_logger
.
error
(
String
.
format
(
"Error when getting
message
bucket of %s!"
,
timestamp
),
e
);
m_logger
.
error
(
String
.
format
(
"Error when getting
logview
bucket of %s!"
,
timestamp
),
e
);
}
finally
{
if
(
localBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
localBucket
);
}
if
(
remoteBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
remoteBucket
);
if
(
logviewBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
logviewBucket
);
}
}
}
...
...
@@ -132,7 +121,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
Bucket
<
String
>
bucket
=
null
;
try
{
bucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"transaction"
,
"local"
);
bucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"transaction"
);
for
(
String
id
:
bucket
.
getIdsByPrefix
(
""
))
{
String
xml
=
bucket
.
findById
(
id
);
...
...
@@ -267,10 +256,6 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
loadReports
();
}
public
void
setLocal
(
boolean
local
)
{
m_local
=
local
;
}
@Override
protected
void
store
(
List
<
TransactionReport
>
reports
)
{
if
(
reports
==
null
||
reports
.
size
()
==
0
)
{
...
...
@@ -286,18 +271,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String
domain
=
tree
.
getDomain
();
try
{
Bucket
<
MessageTree
>
localBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"local"
);
localBucket
.
storeById
(
messageId
,
tree
);
if
(!
m_local
)
{
Bucket
<
MessageTree
>
remoteBucket
=
m_bucketManager
.
getMessageBucket
(
new
Date
(
m_startTime
),
domain
,
"remote"
);
Bucket
<
MessageTree
>
logviewBucket
=
m_bucketManager
.
getLogviewBucket
(
new
Date
(
m_startTime
),
domain
);
remoteBucket
.
storeById
(
messageId
,
tree
);
}
logviewBucket
.
storeById
(
messageId
,
tree
);
}
catch
(
IOException
e
)
{
m_logger
.
error
(
"Error when storing
message
for transaction analyzer!"
,
e
);
m_logger
.
error
(
"Error when storing
logview
for transaction analyzer!"
,
e
);
}
}
...
...
@@ -305,28 +283,16 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
Date
timestamp
=
new
Date
(
m_startTime
);
DefaultXmlBuilder
builder
=
new
DefaultXmlBuilder
(
true
);
Transaction
t
=
Cat
.
getProducer
().
newTransaction
(
"Checkpoint"
,
getClass
().
getSimpleName
());
Bucket
<
String
>
localBucket
=
null
;
Bucket
<
String
>
remoteBucket
=
null
;
Bucket
<
String
>
reportBucket
=
null
;
try
{
localBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"transaction"
,
"local"
);
if
(!
m_local
)
{
remoteBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"transaction"
,
"remote"
);
}
// delete old one, not append mode
localBucket
.
deleteAndCreate
();
reportBucket
=
m_bucketManager
.
getReportBucket
(
timestamp
,
"transaction"
);
for
(
TransactionReport
report
:
reports
)
{
String
xml
=
builder
.
buildXml
(
report
);
String
domain
=
report
.
getDomain
();
localBucket
.
storeById
(
domain
,
xml
);
if
(!
m_local
)
{
remoteBucket
.
storeById
(
domain
,
xml
);
}
reportBucket
.
storeById
(
domain
,
xml
);
}
t
.
setStatus
(
Message
.
SUCCESS
);
...
...
@@ -337,12 +303,8 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
finally
{
t
.
complete
();
if
(
localBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
localBucket
);
}
if
(
remoteBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
remoteBucket
);
if
(
reportBucket
!=
null
)
{
m_bucketManager
.
closeBucket
(
reportBucket
);
}
}
}
...
...
cat-consumer/src/main/resources/META-INF/plexus/components.xml
浏览文件 @
23856a15
...
...
@@ -3,9 +3,6 @@
<component>
<role>
com.dianping.cat.consumer.AnalyzerFactory
</role>
<implementation>
com.dianping.cat.consumer.DefaultAnalyzerFactory
</implementation>
<configuration>
<local>
true
</local>
</configuration>
</component>
<component>
<role>
com.dianping.cat.message.spi.MessageConsumer
</role>
...
...
cat-core/src/main/java/com/dianping/cat/storage/BucketManager.java
浏览文件 @
23856a15
...
...
@@ -8,7 +8,9 @@ import com.dianping.cat.message.spi.MessageTree;
public
interface
BucketManager
{
public
void
closeBucket
(
Bucket
<?>
bucket
);
public
Bucket
<
MessageTree
>
get
MessageBucket
(
Date
timestamp
,
String
domain
,
String
namespace
)
throws
IOException
;
public
Bucket
<
MessageTree
>
get
LogviewBucket
(
Date
timestamp
,
String
domain
)
throws
IOException
;
public
Bucket
<
String
>
getReportBucket
(
Date
timestamp
,
String
name
,
String
namespace
)
throws
IOException
;
public
Bucket
<
MessageTree
>
getMessageBucket
(
Date
timestamp
,
String
domain
)
throws
IOException
;
public
Bucket
<
String
>
getReportBucket
(
Date
timestamp
,
String
name
)
throws
IOException
;
}
cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java
浏览文件 @
23856a15
...
...
@@ -90,13 +90,18 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
}
@Override
public
Bucket
<
MessageTree
>
get
MessageBucket
(
Date
timestamp
,
String
domain
,
String
namespace
)
throws
IOException
{
return
getBucket
(
MessageTree
.
class
,
timestamp
,
domain
,
namespace
);
public
Bucket
<
MessageTree
>
get
LogviewBucket
(
Date
timestamp
,
String
domain
)
throws
IOException
{
return
getBucket
(
MessageTree
.
class
,
timestamp
,
domain
,
"logview"
);
}
@Override
public
Bucket
<
String
>
getReportBucket
(
Date
timestamp
,
String
name
,
String
namespace
)
throws
IOException
{
return
getBucket
(
String
.
class
,
timestamp
,
name
,
namespace
);
public
Bucket
<
MessageTree
>
getMessageBucket
(
Date
timestamp
,
String
domain
)
throws
IOException
{
return
getBucket
(
MessageTree
.
class
,
timestamp
,
domain
,
"message"
);
}
@Override
public
Bucket
<
String
>
getReportBucket
(
Date
timestamp
,
String
name
)
throws
IOException
{
return
getBucket
(
String
.
class
,
timestamp
,
name
,
"report"
);
}
static
class
Entry
{
...
...
cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java
浏览文件 @
23856a15
...
...
@@ -241,14 +241,9 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
}
protected
List
<
String
>
prepareTags
(
MessageTree
tree
)
{
List
<
String
>
tags
=
new
ArrayList
<
String
>();
List
<
String
>
tags
=
new
ArrayList
<
String
>(
1
);
tags
.
add
(
"t:"
+
tree
.
getThreadId
());
tags
.
add
(
"r:"
+
tree
.
getMessageId
());
if
(
tree
.
getSessionToken
()
!=
null
)
{
tags
.
add
(
"s:"
+
tree
.
getSessionToken
());
}
return
tags
;
}
...
...
@@ -259,14 +254,11 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
@Override
public
boolean
storeById
(
String
id
,
MessageTree
tree
)
throws
IOException
{
m_writeLock
.
lock
();
if
(
m_idToOffsets
.
containsKey
(
id
))
{
return
false
;
}
List
<
String
>
tags
=
prepareTags
(
tree
);
ChannelBuffer
buf
=
ChannelBuffers
.
dynamicBuffer
(
8192
);
m_codec
.
encode
(
tree
,
buf
);
...
...
@@ -274,6 +266,8 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
int
length
=
buf
.
readInt
();
byte
[]
num
=
String
.
valueOf
(
length
).
getBytes
(
"utf-8"
);
m_writeLock
.
lock
();
try
{
m_writeDataFile
.
write
(
num
);
m_writeDataFile
.
write
(
'\n'
);
...
...
cat-core/src/main/java/com/dianping/cat/storage/report/LocalReportBucket.java
浏览文件 @
23856a15
...
...
@@ -158,7 +158,7 @@ public class LocalReportBucket implements Bucket<String>, LogEnabled {
@Override
public
Collection
<
String
>
getIdsByPrefix
(
String
tag
)
{
throw
new
UnsupportedOperationException
(
"Not supported by local logview bucket!"
);
return
m_idToOffsets
.
keySet
(
);
}
@Override
...
...
@@ -221,8 +221,6 @@ public class LocalReportBucket implements Bucket<String>, LogEnabled {
@Override
public
boolean
storeById
(
String
id
,
String
report
)
throws
IOException
{
m_writeLock
.
lock
();
if
(
m_idToOffsets
.
containsKey
(
id
))
{
return
false
;
}
...
...
@@ -231,6 +229,8 @@ public class LocalReportBucket implements Bucket<String>, LogEnabled {
int
length
=
content
.
length
;
byte
[]
num
=
String
.
valueOf
(
length
).
getBytes
(
"utf-8"
);
m_writeLock
.
lock
();
try
{
m_writeDataFile
.
write
(
num
);
m_writeDataFile
.
write
(
'\n'
);
...
...
cat-core/src/test/java/com/dianping/cat/storage/BucketConcurrentTest.java
浏览文件 @
23856a15
...
...
@@ -50,7 +50,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
public
void
testMessageBucket
()
throws
Exception
{
Date
timestamp
=
new
Date
();
BucketManager
manager
=
lookup
(
BucketManager
.
class
);
final
Bucket
<
MessageTree
>
bucket
=
manager
.
get
MessageBucket
(
timestamp
,
"concurrent/message"
,
"local
"
);
final
Bucket
<
MessageTree
>
bucket
=
manager
.
get
LogviewBucket
(
timestamp
,
"concurrent/message
"
);
ExecutorService
pool
=
Executors
.
newFixedThreadPool
(
10
);
for
(
int
p
=
0
;
p
<
10
;
p
++)
{
...
...
@@ -115,7 +115,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
public
void
testStringBucket
()
throws
Exception
{
Date
timestamp
=
new
Date
();
BucketManager
manager
=
lookup
(
BucketManager
.
class
);
final
Bucket
<
String
>
bucket
=
manager
.
getReportBucket
(
timestamp
,
"concurrent/data"
,
"local"
);
final
Bucket
<
String
>
bucket
=
manager
.
getReportBucket
(
timestamp
,
"concurrent/data"
);
ExecutorService
pool
=
Executors
.
newFixedThreadPool
(
10
);
for
(
int
p
=
0
;
p
<
10
;
p
++)
{
...
...
@@ -143,7 +143,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
pool
.
awaitTermination
(
5000
,
TimeUnit
.
MILLISECONDS
);
final
Bucket
<
String
>
bucket2
=
manager
.
getReportBucket
(
timestamp
,
"concurrent/data"
,
"local"
);
final
Bucket
<
String
>
bucket2
=
manager
.
getReportBucket
(
timestamp
,
"concurrent/data"
);
for
(
int
p
=
0
;
p
<
10
;
p
++)
{
final
int
num
=
p
;
...
...
cat-core/src/test/java/com/dianping/cat/storage/BucketManagerTest.java
浏览文件 @
23856a15
...
...
@@ -17,10 +17,10 @@ public class BucketManagerTest extends ComponentTestCase {
public
void
test
()
throws
Exception
{
Date
timestamp
=
new
Date
();
BucketManager
manager
=
lookup
(
BucketManager
.
class
);
Bucket
<
MessageTree
>
bucket1
=
manager
.
get
MessageBucket
(
timestamp
,
"test/path1"
,
"local
"
);
Bucket
<
MessageTree
>
bucket2
=
manager
.
get
MessageBucket
(
timestamp
,
"test/path2"
,
"local
"
);
Bucket
<
MessageTree
>
bucket3
=
manager
.
get
MessageBucket
(
timestamp
,
"test/path1"
,
"local
"
);
Bucket
<
MessageTree
>
bucket4
=
manager
.
get
MessageBucket
(
timestamp
,
"test/path2"
,
"local
"
);
Bucket
<
MessageTree
>
bucket1
=
manager
.
get
LogviewBucket
(
timestamp
,
"test/path1
"
);
Bucket
<
MessageTree
>
bucket2
=
manager
.
get
LogviewBucket
(
timestamp
,
"test/path2
"
);
Bucket
<
MessageTree
>
bucket3
=
manager
.
get
LogviewBucket
(
timestamp
,
"test/path1
"
);
Bucket
<
MessageTree
>
bucket4
=
manager
.
get
LogviewBucket
(
timestamp
,
"test/path2
"
);
Assert
.
assertEquals
(
bucket1
,
bucket3
);
Assert
.
assertEquals
(
bucket2
,
bucket4
);
...
...
cat-home/src/main/java/com/dianping/cat/report/page/model/event/HistoricalEventService.java
浏览文件 @
23856a15
...
...
@@ -26,7 +26,7 @@ public class HistoricalEventService extends BaseHistoricalModelService<EventRepo
Bucket
<
String
>
bucket
=
null
;
try
{
bucket
=
m_bucketManager
.
getReportBucket
(
new
Date
(
date
),
getName
()
,
"remote"
);
bucket
=
m_bucketManager
.
getReportBucket
(
new
Date
(
date
),
getName
());
List
<
String
>
xmls
=
bucket
.
findAllById
(
domain
);
...
...
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HistoricalLogViewService.java
浏览文件 @
23856a15
...
...
@@ -32,7 +32,7 @@ public class HistoricalLogViewService extends BaseHistoricalModelService<String>
String
tag
=
request
.
getProperty
(
"tag"
);
MessageId
id
=
MessageId
.
parse
(
messageId
);
Date
timestamp
=
new
Date
(
id
.
getTimestamp
());
Bucket
<
MessageTree
>
bucket
=
m_bucketManager
.
get
MessageBucket
(
timestamp
,
id
.
getDomain
(),
"remote"
);
Bucket
<
MessageTree
>
bucket
=
m_bucketManager
.
get
LogviewBucket
(
timestamp
,
id
.
getDomain
()
);
MessageTree
tree
=
null
;
if
(
tag
!=
null
&&
direction
!=
null
)
{
...
...
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java
浏览文件 @
23856a15
...
...
@@ -34,7 +34,7 @@ public class LocalLogViewService extends BaseLocalModelService<String> {
String
tag
=
request
.
getProperty
(
"tag"
);
MessageId
id
=
MessageId
.
parse
(
messageId
);
Date
timestamp
=
new
Date
(
id
.
getTimestamp
());
Bucket
<
MessageTree
>
bucket
=
m_bucketManager
.
get
MessageBucket
(
timestamp
,
id
.
getDomain
(),
"local"
);
Bucket
<
MessageTree
>
bucket
=
m_bucketManager
.
get
LogviewBucket
(
timestamp
,
id
.
getDomain
()
);
MessageTree
tree
=
null
;
if
(
tag
!=
null
&&
direction
!=
null
)
{
...
...
cat-home/src/main/java/com/dianping/cat/report/page/model/problem/HistoricalProblemService.java
浏览文件 @
23856a15
...
...
@@ -27,7 +27,7 @@ public class HistoricalProblemService extends BaseHistoricalModelService<Problem
Bucket
<
String
>
bucket
=
null
;
try
{
bucket
=
m_bucketManager
.
getReportBucket
(
new
Date
(
date
),
getName
()
,
"remote"
);
bucket
=
m_bucketManager
.
getReportBucket
(
new
Date
(
date
),
getName
());
List
<
String
>
xmls
=
bucket
.
findAllById
(
domain
);
...
...
cat-home/src/main/java/com/dianping/cat/report/page/model/transaction/HistoricalTransactionService.java
浏览文件 @
23856a15
...
...
@@ -26,7 +26,7 @@ public class HistoricalTransactionService extends BaseHistoricalModelService<Tra
Bucket
<
String
>
bucket
=
null
;
try
{
bucket
=
m_bucketManager
.
getReportBucket
(
new
Date
(
date
),
getName
()
,
"remote"
);
bucket
=
m_bucketManager
.
getReportBucket
(
new
Date
(
date
),
getName
());
List
<
String
>
xmls
=
bucket
.
findAllById
(
domain
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录