Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦中观雨
cat
提交
4053f942
C
cat
项目概览
梦中观雨
/
cat
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
cat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4053f942
编写于
8月 09, 2012
作者:
F
Frankie Wu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
bucket manager integration
上级
4663afea
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
374 addition
and
90 deletion
+374
-90
cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java
...m/dianping/cat/consumer/build/ComponentsConfigurator.java
+2
-1
cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpAnalyzer.java
...ain/java/com/dianping/cat/consumer/dump/DumpAnalyzer.java
+14
-7
cat-consumer/src/main/resources/META-INF/plexus/components.xml
...onsumer/src/main/resources/META-INF/plexus/components.xml
+1
-0
cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucket.java
...ava/com/dianping/cat/storage/dump/LocalMessageBucket.java
+22
-4
cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java
.../dianping/cat/storage/dump/LocalMessageBucketManager.java
+86
-41
cat-core/src/test/java/com/dianping/cat/message/spi/internal/DefaultMessagePathBuilderTest.java
...t/message/spi/internal/DefaultMessagePathBuilderTest.java
+35
-0
cat-core/src/test/java/com/dianping/cat/storage/dump/LocalMessageBucketTest.java
...com/dianping/cat/storage/dump/LocalMessageBucketTest.java
+3
-1
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManager.java
...om/dianping/cat/hadoop/hdfs/HdfsMessageBucketManager.java
+52
-30
cat-home/src/main/java/com/dianping/cat/build/ServiceComponentConfigurator.java
.../com/dianping/cat/build/ServiceComponentConfigurator.java
+9
-2
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HistoricalLogViewService.java
...t/report/page/model/logview/HistoricalLogViewService.java
+14
-0
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HistoricalMessageService.java
...t/report/page/model/logview/HistoricalMessageService.java
+78
-0
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java
...ng/cat/report/page/model/logview/LocalLogViewService.java
+14
-0
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalMessageService.java
...ng/cat/report/page/model/logview/LocalMessageService.java
+20
-4
cat-home/src/main/resources/META-INF/plexus/components.xml
cat-home/src/main/resources/META-INF/plexus/components.xml
+24
-0
未找到文件。
cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java
浏览文件 @
4053f942
...
...
@@ -38,6 +38,7 @@ import com.dianping.cat.message.spi.MessageCodec;
import
com.dianping.cat.message.spi.MessageConsumer
;
import
com.dianping.cat.message.spi.MessagePathBuilder
;
import
com.dianping.cat.storage.BucketManager
;
import
com.dianping.cat.storage.dump.LocalMessageBucketManager
;
import
com.dianping.cat.storage.dump.MessageBucketManager
;
import
com.site.initialization.Module
;
import
com.site.lookup.configuration.AbstractResourceConfigurator
;
...
...
@@ -95,7 +96,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all
.
add
(
C
(
DumpAnalyzer
.
class
).
is
(
PER_LOOKUP
)
//
.
req
(
ServerConfigManager
.
class
,
MessagePathBuilder
.
class
)
//
.
req
(
DumpUploader
.
class
,
DumpChannelManager
.
class
)
//
.
req
(
MessageBucketManager
.
class
));
.
req
(
MessageBucketManager
.
class
,
LocalMessageBucketManager
.
ID
));
all
.
add
(
C
(
DumpChannelManager
.
class
)
//
.
req
(
MessageCodec
.
class
,
"plain-text"
));
...
...
cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpAnalyzer.java
浏览文件 @
4053f942
...
...
@@ -16,6 +16,7 @@ import com.dianping.cat.message.internal.MessageId;
import
com.dianping.cat.message.spi.AbstractMessageAnalyzer
;
import
com.dianping.cat.message.spi.MessagePathBuilder
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.storage.dump.LocalMessageBucketManager
;
import
com.dianping.cat.storage.dump.MessageBucketManager
;
import
com.site.lookup.annotation.Inject
;
...
...
@@ -27,13 +28,13 @@ public class DumpAnalyzer extends AbstractMessageAnalyzer<Object> implements Ini
private
MessagePathBuilder
m_builder
;
@Inject
private
DumpChannelManager
m_
m
anager
;
private
DumpChannelManager
m_
channelM
anager
;
@Inject
private
DumpUploader
m_uploader
;
@Inject
private
MessageBucketManager
m_bucketManager
;
@Inject
(
type
=
MessageBucketManager
.
class
,
value
=
LocalMessageBucketManager
.
ID
)
private
Local
MessageBucketManager
m_bucketManager
;
public
DumpUploader
getDumpUploader
()
{
return
m_uploader
;
...
...
@@ -52,7 +53,13 @@ public class DumpAnalyzer extends AbstractMessageAnalyzer<Object> implements Ini
@Override
public
void
doCheckpoint
(
boolean
atEnd
)
{
if
(
atEnd
)
{
m_manager
.
closeAllChannels
(
m_startTime
);
m_channelManager
.
closeAllChannels
(
m_startTime
);
try
{
m_bucketManager
.
archive
(
m_startTime
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
...
...
@@ -95,13 +102,13 @@ public class DumpAnalyzer extends AbstractMessageAnalyzer<Object> implements Ini
long
timestamp
=
tree
.
getMessage
().
getTimestamp
();
String
domain
=
tree
.
getDomain
();
String
path
=
m_builder
.
getMessagePath
(
domain
+
"-"
+
ipAddress
,
new
Date
(
timestamp
));
DumpChannel
channel
=
m_
m
anager
.
openChannel
(
path
,
false
,
m_startTime
);
DumpChannel
channel
=
m_
channelM
anager
.
openChannel
(
path
,
false
,
m_startTime
);
int
length
=
channel
.
write
(
tree
);
if
(
length
<=
0
)
{
m_
m
anager
.
closeChannel
(
channel
);
m_
channelM
anager
.
closeChannel
(
channel
);
channel
=
m_
m
anager
.
openChannel
(
path
,
true
,
m_startTime
);
channel
=
m_
channelM
anager
.
openChannel
(
path
,
true
,
m_startTime
);
channel
.
write
(
tree
);
}
}
catch
(
Exception
e
)
{
...
...
cat-consumer/src/main/resources/META-INF/plexus/components.xml
浏览文件 @
4053f942
...
...
@@ -179,6 +179,7 @@
</requirement>
<requirement>
<role>
com.dianping.cat.storage.dump.MessageBucketManager
</role>
<role-hint>
local
</role-hint>
</requirement>
</requirements>
</component>
...
...
cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucket.java
浏览文件 @
4053f942
...
...
@@ -19,6 +19,7 @@ import com.dianping.cat.message.internal.MessageId;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.internal.DefaultMessageTree
;
import
com.site.helper.Files
;
import
com.site.lookup.annotation.Inject
;
public
class
LocalMessageBucket
implements
MessageBucket
{
...
...
@@ -38,7 +39,7 @@ public class LocalMessageBucket implements MessageBucket {
private
int
m_rawSize
;
private
File
m_dataFile
;
private
String
m_dataFile
;
private
long
m_lastAccessTime
;
...
...
@@ -92,9 +93,12 @@ public class LocalMessageBucket implements MessageBucket {
@Override
public
void
initialize
(
String
dataFile
)
throws
IOException
{
m_dataFile
=
new
File
(
m_baseDir
,
dataFile
);
m_writer
=
new
MessageBlockWriter
(
m_dataFile
);
m_reader
=
new
MessageBlockReader
(
m_dataFile
);
m_dataFile
=
dataFile
;
File
file
=
new
File
(
m_baseDir
,
dataFile
);
m_writer
=
new
MessageBlockWriter
(
file
);
m_reader
=
new
MessageBlockReader
(
file
);
}
public
void
setBaseDir
(
File
baseDir
)
{
...
...
@@ -252,4 +256,18 @@ public class LocalMessageBucket implements MessageBucket {
m_blockSize
+=
data
.
length
+
4
;
}
}
public
void
archive
()
throws
IOException
{
File
outbox
=
new
File
(
m_baseDir
,
"outbox"
);
File
from
=
new
File
(
m_baseDir
,
m_dataFile
);
File
to
=
new
File
(
outbox
,
m_dataFile
);
File
fromIndex
=
new
File
(
m_baseDir
,
m_dataFile
+
".idx"
);
File
toIndex
=
new
File
(
outbox
,
m_dataFile
+
".idx"
);
to
.
getParentFile
().
mkdirs
();
Files
.
forDir
().
copyFile
(
from
,
to
);
Files
.
forDir
().
copyFile
(
fromIndex
,
toIndex
);
Files
.
forDir
().
delete
(
from
);
Files
.
forDir
().
delete
(
fromIndex
);
}
}
cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java
浏览文件 @
4053f942
...
...
@@ -11,7 +11,11 @@ import java.util.Map;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException
;
import
com.dianping.cat.Cat
;
import
com.dianping.cat.configuration.ServerConfigManager
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.MessageProducer
;
import
com.dianping.cat.message.Transaction
;
import
com.dianping.cat.message.internal.MessageId
;
import
com.dianping.cat.message.spi.MessagePathBuilder
;
import
com.dianping.cat.message.spi.MessageTree
;
...
...
@@ -35,13 +39,35 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
private
Map
<
String
,
LocalMessageBucket
>
m_buckets
=
new
HashMap
<
String
,
LocalMessageBucket
>();
public
void
archive
(
long
startTime
)
throws
IOException
{
String
path
=
m_pathBuilder
.
getPath
(
new
Date
(
startTime
),
""
);
List
<
String
>
keys
=
new
ArrayList
<
String
>();
synchronized
(
m_buckets
)
{
for
(
String
key
:
m_buckets
.
keySet
())
{
if
(
key
.
startsWith
(
path
))
{
keys
.
add
(
key
);
}
}
for
(
String
key
:
keys
)
{
LocalMessageBucket
bucket
=
m_buckets
.
remove
(
key
);
bucket
.
close
();
bucket
.
archive
();
}
}
}
@Override
public
void
close
()
throws
IOException
{
for
(
LocalMessageBucket
bucket
:
m_buckets
.
values
())
{
bucket
.
close
();
}
synchronized
(
m_buckets
)
{
for
(
LocalMessageBucket
bucket
:
m_buckets
.
values
())
{
bucket
.
close
();
}
m_buckets
.
clear
();
m_buckets
.
clear
();
}
}
void
closeIdleBuckets
()
throws
IOException
{
...
...
@@ -49,17 +75,19 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
long
hour
=
3600
*
1000L
;
List
<
String
>
closedKeys
=
new
ArrayList
<
String
>();
for
(
Map
.
Entry
<
String
,
LocalMessageBucket
>
e
:
m_buckets
.
entrySet
())
{
LocalMessageBucket
bucket
=
e
.
getValue
();
synchronized
(
m_buckets
)
{
for
(
Map
.
Entry
<
String
,
LocalMessageBucket
>
e
:
m_buckets
.
entrySet
())
{
LocalMessageBucket
bucket
=
e
.
getValue
();
if
(
now
-
bucket
.
getLastAccessTime
()
>=
hour
)
{
bucket
.
close
();
closedKeys
.
add
(
e
.
getKey
());
if
(
now
-
bucket
.
getLastAccessTime
()
>=
hour
)
{
bucket
.
close
();
closedKeys
.
add
(
e
.
getKey
());
}
}
}
for
(
String
key
:
closedKeys
)
{
m_buckets
.
remove
(
key
);
for
(
String
key
:
closedKeys
)
{
m_buckets
.
remove
(
key
);
}
}
}
...
...
@@ -74,43 +102,60 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
@Override
public
MessageTree
loadMessage
(
String
messageId
)
throws
IOException
{
MessageId
id
=
MessageId
.
parse
(
messageId
);
final
String
path
=
m_pathBuilder
.
getPath
(
new
Date
(
id
.
getTimestamp
()),
""
);
final
File
dir
=
new
File
(
m_baseDir
,
path
);
final
String
key
=
"-"
+
id
.
getDomain
()
+
"-"
;
final
List
<
String
>
paths
=
new
ArrayList
<
String
>();
Scanners
.
forDir
().
scan
(
dir
,
new
FileMatcher
()
{
@Override
public
Direction
matches
(
File
base
,
String
name
)
{
if
(
name
.
contains
(
key
)
&&
!
name
.
endsWith
(
".idx"
))
{
paths
.
add
(
path
+
name
);
MessageProducer
cat
=
Cat
.
getProducer
();
Transaction
t
=
cat
.
newTransaction
(
"BucketService"
,
getClass
().
getSimpleName
());
t
.
setStatus
(
Message
.
SUCCESS
);
try
{
MessageId
id
=
MessageId
.
parse
(
messageId
);
final
String
path
=
m_pathBuilder
.
getPath
(
new
Date
(
id
.
getTimestamp
()),
""
);
final
File
dir
=
new
File
(
m_baseDir
,
path
);
final
String
key
=
"-"
+
id
.
getDomain
()
+
"-"
;
final
List
<
String
>
paths
=
new
ArrayList
<
String
>();
Scanners
.
forDir
().
scan
(
dir
,
new
FileMatcher
()
{
@Override
public
Direction
matches
(
File
base
,
String
name
)
{
if
(
name
.
contains
(
key
)
&&
!
name
.
endsWith
(
".idx"
))
{
paths
.
add
(
path
+
name
);
}
return
Direction
.
NEXT
;
}
});
return
Direction
.
NEXT
;
}
});
for
(
String
dataFile
:
paths
)
{
LocalMessageBucket
bucket
=
m_buckets
.
get
(
dataFile
);
for
(
String
dataFile
:
paths
)
{
LocalMessageBucket
bucket
=
m_buckets
.
get
(
dataFile
);
if
(
bucket
==
null
)
{
File
file
=
new
File
(
m_baseDir
,
dataFile
);
if
(
bucket
==
null
)
{
File
file
=
new
File
(
m_baseDir
,
dataFile
);
if
(
file
.
exists
())
{
bucket
=
(
LocalMessageBucket
)
lookup
(
MessageBucket
.
class
,
LocalMessageBucket
.
ID
);
bucket
.
setBaseDir
(
m_baseDir
);
bucket
.
initialize
(
dataFile
);
m_buckets
.
put
(
dataFile
,
bucket
);
}
}
if
(
file
.
exists
())
{
bucket
=
(
LocalMessageBucket
)
lookup
(
MessageBucket
.
class
,
LocalMessageBucket
.
ID
);
bucket
.
setBaseDir
(
m_baseDir
);
bucket
.
initialize
(
dataFile
);
m_buckets
.
put
(
dataFile
,
bucket
);
if
(
bucket
!=
null
)
{
return
bucket
.
findById
(
messageId
);
}
}
if
(
bucket
!=
null
)
{
return
bucket
.
findById
(
messageId
);
}
return
null
;
}
catch
(
IOException
e
)
{
t
.
setStatus
(
e
);
cat
.
logError
(
e
);
throw
e
;
}
catch
(
RuntimeException
e
)
{
t
.
setStatus
(
e
);
cat
.
logError
(
e
);
throw
e
;
}
finally
{
t
.
complete
();
}
return
null
;
}
public
void
setBaseDir
(
File
baseDir
)
{
...
...
@@ -150,7 +195,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
try
{
closeIdleBuckets
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
(
);
Cat
.
getProducer
().
logError
(
e
);
}
}
}
catch
(
InterruptedException
e
)
{
...
...
cat-core/src/test/java/com/dianping/cat/message/spi/internal/DefaultMessagePathBuilderTest.java
浏览文件 @
4053f942
package
com.dianping.cat.message.spi.internal
;
import
java.text.MessageFormat
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.List
;
import
junit.framework.Assert
;
import
org.junit.Test
;
import
com.site.helper.Splitters
;
public
class
DefaultMessagePathBuilderTest
{
private
String
convertToHex
(
String
ip
)
{
List
<
String
>
items
=
Splitters
.
by
(
"."
).
noEmptyItem
().
split
(
ip
);
byte
[]
bytes
=
new
byte
[
4
];
for
(
int
i
=
0
;
i
<
4
;
i
++)
{
bytes
[
i
]
=
(
byte
)
Integer
.
parseInt
(
items
.
get
(
i
));
}
StringBuilder
sb
=
new
StringBuilder
(
bytes
.
length
/
2
);
for
(
byte
b
:
bytes
)
{
sb
.
append
(
Integer
.
toHexString
((
b
>>
4
)
&
0x0F
));
sb
.
append
(
Integer
.
toHexString
(
b
&
0x0F
));
}
return
sb
.
toString
();
}
@Test
public
void
testParseMessageIdFromPath
()
throws
Exception
{
MessageFormat
format
=
new
MessageFormat
(
"{0,date,yyyyMMdd'/'HH}/{1}"
);
String
path
=
"20120807/14/Cat-Cat-192.168.64.153"
;
Object
[]
objects
=
format
.
parse
(
path
);
Date
timestamp
=
(
Date
)
objects
[
0
];
List
<
String
>
parts
=
Splitters
.
by
(
'-'
).
split
((
String
)
objects
[
1
]);
String
domain
=
parts
.
get
(
1
);
String
ip
=
parts
.
get
(
2
);
String
id
=
domain
+
"-"
+
convertToHex
(
ip
)
+
"-"
+
timestamp
.
getTime
()
/
3600000L
+
"-0"
;
Assert
.
assertEquals
(
"Cat-c0a84099-373422-0"
,
id
);
}
@Test
public
void
testRemotePathBuilder
()
{
...
...
cat-core/src/test/java/com/dianping/cat/storage/dump/LocalMessageBucketTest.java
浏览文件 @
4053f942
...
...
@@ -51,6 +51,7 @@ public class LocalMessageBucketTest extends ComponentTestCase {
}
bucket
.
close
();
bucket
.
archive
();
}
@Test
...
...
@@ -91,7 +92,8 @@ public class LocalMessageBucketTest extends ComponentTestCase {
LocalMessageBucket
bucket
=
(
LocalMessageBucket
)
lookup
(
MessageBucket
.
class
,
LocalMessageBucket
.
ID
);
bucket
.
setMessageCodec
(
MockCodec
.
INSTANCE
);
bucket
.
initialize
(
"target/bucket/hdfs/dump/dump"
+
id
);
bucket
.
setBaseDir
(
new
File
(
"target/bucket/hdfs/dump"
));
bucket
.
initialize
(
"dump"
+
id
);
factory
.
setIpAddress
(
"7f000001"
);
factory
.
initialize
(
"Test"
);
return
bucket
;
...
...
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManager.java
浏览文件 @
4053f942
...
...
@@ -13,6 +13,10 @@ import org.apache.hadoop.fs.PathFilter;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException
;
import
com.dianping.cat.Cat
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.MessageProducer
;
import
com.dianping.cat.message.Transaction
;
import
com.dianping.cat.message.internal.MessageId
;
import
com.dianping.cat.message.spi.MessagePathBuilder
;
import
com.dianping.cat.message.spi.MessageTree
;
...
...
@@ -59,46 +63,64 @@ public class HdfsMessageBucketManager extends ContainerHolder implements Message
@Override
public
MessageTree
loadMessage
(
String
messageId
)
throws
IOException
{
MessageId
id
=
MessageId
.
parse
(
messageId
);
final
String
path
=
m_pathBuilder
.
getPath
(
new
Date
(
id
.
getTimestamp
()),
""
);
final
StringBuilder
sb
=
new
StringBuilder
();
FileSystem
fs
=
m_manager
.
getFileSystem
(
"dump"
,
sb
);
MessageProducer
cat
=
Cat
.
getProducer
();
Transaction
t
=
cat
.
newTransaction
(
"BucketService"
,
getClass
().
getSimpleName
());
sb
.
append
(
'/'
).
append
(
path
);
t
.
setStatus
(
Message
.
SUCCESS
);
final
String
key
=
"-"
+
id
.
getDomain
()
+
"-"
;
final
String
str
=
sb
.
toString
();
final
Path
basePath
=
new
Path
(
str
);
final
List
<
String
>
paths
=
new
ArrayList
<
String
>();
try
{
MessageId
id
=
MessageId
.
parse
(
messageId
);
final
String
path
=
m_pathBuilder
.
getPath
(
new
Date
(
id
.
getTimestamp
()),
""
);
final
StringBuilder
sb
=
new
StringBuilder
();
FileSystem
fs
=
m_manager
.
getFileSystem
(
"dump"
,
sb
);
fs
.
listStatus
(
basePath
,
new
PathFilter
()
{
@Override
public
boolean
accept
(
Path
p
)
{
String
name
=
p
.
getName
();
sb
.
append
(
'/'
).
append
(
path
);
if
(
name
.
contains
(
key
)
&&
!
name
.
endsWith
(
".idx"
))
{
paths
.
add
(
path
+
name
);
final
String
key
=
"-"
+
id
.
getDomain
()
+
"-"
;
final
String
str
=
sb
.
toString
();
final
Path
basePath
=
new
Path
(
str
);
final
List
<
String
>
paths
=
new
ArrayList
<
String
>();
fs
.
listStatus
(
basePath
,
new
PathFilter
()
{
@Override
public
boolean
accept
(
Path
p
)
{
String
name
=
p
.
getName
();
if
(
name
.
contains
(
key
)
&&
!
name
.
endsWith
(
".idx"
))
{
paths
.
add
(
path
+
name
);
}
return
false
;
}
});
return
false
;
}
});
for
(
String
dataFile
:
paths
)
{
HdfsMessageBucket
bucket
=
m_buckets
.
get
(
dataFile
);
if
(
bucket
==
null
)
{
bucket
=
(
HdfsMessageBucket
)
lookup
(
MessageBucket
.
class
,
HdfsMessageBucket
.
ID
);
bucket
.
initialize
(
dataFile
);
m_buckets
.
put
(
dataFile
,
bucket
);
}
for
(
String
dataFile
:
paths
)
{
HdfsMessageBucket
bucket
=
m_buckets
.
get
(
dataFile
);
if
(
bucket
==
null
)
{
bucket
=
(
HdfsMessageBucket
)
lookup
(
MessageBucket
.
class
,
HdfsMessageBucket
.
ID
);
bucket
.
initialize
(
dataFile
);
m_buckets
.
put
(
dataFile
,
bucket
);
}
if
(
bucket
!=
null
)
{
return
bucket
.
findById
(
messageId
);
if
(
bucket
!=
null
)
{
return
bucket
.
findById
(
messageId
);
}
}
return
null
;
}
catch
(
IOException
e
)
{
t
.
setStatus
(
e
);
cat
.
logError
(
e
);
throw
e
;
}
catch
(
RuntimeException
e
)
{
t
.
setStatus
(
e
);
cat
.
logError
(
e
);
throw
e
;
}
finally
{
t
.
complete
();
}
return
null
;
}
@Override
...
...
cat-home/src/main/java/com/dianping/cat/build/ServiceComponentConfigurator.java
浏览文件 @
4053f942
...
...
@@ -6,6 +6,7 @@ import java.util.List;
import
com.dianping.cat.configuration.ServerConfigManager
;
import
com.dianping.cat.hadoop.dal.LogviewDao
;
import
com.dianping.cat.hadoop.dal.ReportDao
;
import
com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManager
;
import
com.dianping.cat.hadoop.hdfs.InputChannelManager
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageConsumer
;
...
...
@@ -20,6 +21,7 @@ import com.dianping.cat.report.page.model.ip.HistoricalIpService;
import
com.dianping.cat.report.page.model.ip.LocalIpService
;
import
com.dianping.cat.report.page.model.logview.CompositeLogViewService
;
import
com.dianping.cat.report.page.model.logview.HistoricalLogViewService
;
import
com.dianping.cat.report.page.model.logview.HistoricalMessageService
;
import
com.dianping.cat.report.page.model.logview.LocalLogViewService
;
import
com.dianping.cat.report.page.model.logview.LocalMessageService
;
import
com.dianping.cat.report.page.model.matrix.CompositeMatrixService
;
...
...
@@ -33,6 +35,7 @@ import com.dianping.cat.report.page.model.transaction.CompositeTransactionServic
import
com.dianping.cat.report.page.model.transaction.HistoricalTransactionService
;
import
com.dianping.cat.report.page.model.transaction.LocalTransactionService
;
import
com.dianping.cat.storage.BucketManager
;
import
com.dianping.cat.storage.dump.LocalMessageBucketManager
;
import
com.dianping.cat.storage.dump.MessageBucketManager
;
import
com.site.lookup.configuration.AbstractResourceConfigurator
;
import
com.site.lookup.configuration.Component
;
...
...
@@ -98,8 +101,12 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
all
.
add
(
C
(
ModelService
.
class
,
"message-local"
,
LocalMessageService
.
class
)
//
.
req
(
MessageConsumer
.
class
,
"realtime"
)
//
.
req
(
MessageBucketManager
.
class
)
//
.
req
(
MessageBucketManager
.
class
,
LocalMessageBucketManager
.
ID
)
//
.
req
(
MessageCodec
.
class
,
"html"
));
all
.
add
(
C
(
ModelService
.
class
,
"message-historical"
,
HistoricalMessageService
.
class
)
//
.
req
(
MessageBucketManager
.
class
,
LocalMessageBucketManager
.
ID
,
"m_localBucketManager"
)
//
.
req
(
MessageBucketManager
.
class
,
HdfsMessageBucketManager
.
ID
,
"m_hdfsBucketManager"
)
//
.
req
(
MessageCodec
.
class
,
"html"
));
all
.
add
(
C
(
ModelService
.
class
,
"logview-local"
,
LocalLogViewService
.
class
)
//
.
req
(
MessageConsumer
.
class
,
"realtime"
)
//
...
...
@@ -110,7 +117,7 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
.
req
(
MessageCodec
.
class
,
"html"
));
all
.
add
(
C
(
ModelService
.
class
,
"logview"
,
CompositeLogViewService
.
class
)
//
.
req
(
ServerConfigManager
.
class
)
//
.
req
(
ModelService
.
class
,
new
String
[]
{
"logview-historical"
},
"m_services"
));
.
req
(
ModelService
.
class
,
new
String
[]
{
"
message-historical"
,
"
logview-historical"
},
"m_services"
));
return
all
;
}
...
...
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HistoricalLogViewService.java
浏览文件 @
4053f942
...
...
@@ -112,6 +112,20 @@ public class HistoricalLogViewService extends BaseHistoricalModelService<String>
}
}
@Override
public
boolean
isEligable
(
ModelRequest
request
)
{
boolean
eligibale
=
super
.
isEligable
(
request
);
if
(
eligibale
)
{
String
messageId
=
request
.
getProperty
(
"messageId"
);
MessageId
id
=
MessageId
.
parse
(
messageId
);
return
id
.
getVersion
()
==
1
;
}
return
eligibale
;
}
private
MessageTree
readMessageTree
(
Logview
logview
)
throws
IOException
{
InputChannel
inputChannel
=
null
;
...
...
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/HistoricalMessageService.java
0 → 100644
浏览文件 @
4053f942
package
com.dianping.cat.report.page.model.logview
;
import
java.nio.charset.Charset
;
import
org.jboss.netty.buffer.ChannelBuffer
;
import
org.jboss.netty.buffer.ChannelBuffers
;
import
com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManager
;
import
com.dianping.cat.message.internal.MessageId
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.codec.HtmlMessageCodec
;
import
com.dianping.cat.report.page.model.spi.ModelPeriod
;
import
com.dianping.cat.report.page.model.spi.ModelRequest
;
import
com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService
;
import
com.dianping.cat.storage.dump.LocalMessageBucketManager
;
import
com.dianping.cat.storage.dump.MessageBucketManager
;
import
com.site.lookup.annotation.Inject
;
public
class
HistoricalMessageService
extends
BaseLocalModelService
<
String
>
{
@Inject
(
LocalMessageBucketManager
.
ID
)
private
MessageBucketManager
m_localBucketManager
;
@Inject
(
HdfsMessageBucketManager
.
ID
)
private
MessageBucketManager
m_hdfsBucketManager
;
@Inject
(
HtmlMessageCodec
.
ID
)
private
MessageCodec
m_codec
;
public
HistoricalMessageService
()
{
super
(
"logview"
);
}
@Override
protected
String
getReport
(
ModelRequest
request
,
ModelPeriod
period
,
String
domain
)
throws
Exception
{
String
messageId
=
request
.
getProperty
(
"messageId"
);
if
(
messageId
==
null
)
{
return
null
;
}
MessageTree
tree
=
m_localBucketManager
.
loadMessage
(
messageId
);
if
(
tree
!=
null
)
{
return
toString
(
tree
);
}
tree
=
m_hdfsBucketManager
.
loadMessage
(
messageId
);
if
(
tree
!=
null
)
{
return
toString
(
tree
);
}
else
{
return
null
;
}
}
@Override
public
boolean
isEligable
(
ModelRequest
request
)
{
boolean
eligibale
=
!
request
.
getPeriod
().
isCurrent
();
if
(
eligibale
)
{
String
messageId
=
request
.
getProperty
(
"messageId"
);
MessageId
id
=
MessageId
.
parse
(
messageId
);
return
id
.
getVersion
()
==
2
;
}
return
eligibale
;
}
private
String
toString
(
MessageTree
tree
)
{
ChannelBuffer
buf
=
ChannelBuffers
.
dynamicBuffer
(
8192
);
m_codec
.
encode
(
tree
,
buf
);
buf
.
readInt
();
// get rid of length
return
buf
.
toString
(
Charset
.
forName
(
"utf-8"
));
}
}
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalLogViewService.java
浏览文件 @
4053f942
...
...
@@ -84,4 +84,18 @@ public class LocalLogViewService extends BaseLocalModelService<String> {
return
null
;
}
}
@Override
public
boolean
isEligable
(
ModelRequest
request
)
{
boolean
eligibale
=
super
.
isEligable
(
request
);
if
(
eligibale
)
{
String
messageId
=
request
.
getProperty
(
"messageId"
);
MessageId
id
=
MessageId
.
parse
(
messageId
);
return
id
.
getVersion
()
==
1
;
}
return
eligibale
;
}
}
cat-home/src/main/java/com/dianping/cat/report/page/model/logview/LocalMessageService.java
浏览文件 @
4053f942
...
...
@@ -5,19 +5,21 @@ import java.nio.charset.Charset;
import
org.jboss.netty.buffer.ChannelBuffer
;
import
org.jboss.netty.buffer.ChannelBuffers
;
import
com.dianping.cat.message.internal.MessageId
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.report.page.model.spi.ModelPeriod
;
import
com.dianping.cat.report.page.model.spi.ModelRequest
;
import
com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService
;
import
com.dianping.cat.storage.dump.LocalMessageBucketManager
;
import
com.dianping.cat.storage.dump.MessageBucketManager
;
import
com.site.lookup.annotation.Inject
;
public
class
LocalMessageService
extends
BaseLocalModelService
<
String
>
{
@Inject
@Inject
(
LocalMessageBucketManager
.
ID
)
private
MessageBucketManager
m_bucketManager
;
@Inject
(
value
=
"html"
)
@Inject
(
"html"
)
private
MessageCodec
m_codec
;
public
LocalMessageService
()
{
...
...
@@ -40,8 +42,22 @@ public class LocalMessageService extends BaseLocalModelService<String> {
m_codec
.
encode
(
tree
,
buf
);
buf
.
readInt
();
// get rid of length
return
buf
.
toString
(
Charset
.
forName
(
"utf-8"
));
}
else
{
return
null
;
}
return
null
;
}
@Override
public
boolean
isEligable
(
ModelRequest
request
)
{
boolean
eligibale
=
request
.
getPeriod
().
isCurrent
();
if
(
eligibale
)
{
String
messageId
=
request
.
getProperty
(
"messageId"
);
MessageId
id
=
MessageId
.
parse
(
messageId
);
return
id
.
getVersion
()
==
2
;
}
return
eligibale
;
}
}
cat-home/src/main/resources/META-INF/plexus/components.xml
浏览文件 @
4053f942
...
...
@@ -465,6 +465,28 @@
</requirement>
<requirement>
<role>
com.dianping.cat.storage.dump.MessageBucketManager
</role>
<role-hint>
local
</role-hint>
</requirement>
<requirement>
<role>
com.dianping.cat.message.spi.MessageCodec
</role>
<role-hint>
html
</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>
com.dianping.cat.report.page.model.spi.ModelService
</role>
<role-hint>
message-historical
</role-hint>
<implementation>
com.dianping.cat.report.page.model.logview.HistoricalMessageService
</implementation>
<requirements>
<requirement>
<role>
com.dianping.cat.storage.dump.MessageBucketManager
</role>
<role-hint>
local
</role-hint>
<field-name>
m_localBucketManager
</field-name>
</requirement>
<requirement>
<role>
com.dianping.cat.storage.dump.MessageBucketManager
</role>
<role-hint>
hdfs
</role-hint>
<field-name>
m_hdfsBucketManager
</field-name>
</requirement>
<requirement>
<role>
com.dianping.cat.message.spi.MessageCodec
</role>
...
...
@@ -521,6 +543,7 @@
<requirement>
<role>
com.dianping.cat.report.page.model.spi.ModelService
</role>
<role-hints>
<role-hint>
message-historical
</role-hint>
<role-hint>
logview-historical
</role-hint>
</role-hints>
<field-name>
m_services
</field-name>
...
...
@@ -999,6 +1022,7 @@
<requirements>
<requirement>
<role>
com.dianping.cat.storage.dump.MessageBucketManager
</role>
<role-hint>
local
</role-hint>
</requirement>
<requirement>
<role>
com.dianping.cat.message.spi.MessageCodec
</role>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录