Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦中观雨
cat
提交
e03d6b6b
C
cat
项目概览
梦中观雨
/
cat
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
cat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
e03d6b6b
编写于
2月 18, 2012
作者:
F
Frankie Wu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
remove cat-hadoop
上级
58ca1bcb
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
0 addition
and
1057 deletion
+0
-1057
cat-hadoop/pom.xml
cat-hadoop/pom.xml
+0
-69
cat-hadoop/src/main/java/com/dianping/cat/hadoop/HdfsDumpConsumer.java
...c/main/java/com/dianping/cat/hadoop/HdfsDumpConsumer.java
+0
-37
cat-hadoop/src/main/java/com/dianping/cat/hadoop/build/ComponentsConfigurator.java
...com/dianping/cat/hadoop/build/ComponentsConfigurator.java
+0
-51
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/DefaultOutputChannel.java
...va/com/dianping/cat/hadoop/hdfs/DefaultOutputChannel.java
+0
-83
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/DefaultOutputChannelManager.java
...dianping/cat/hadoop/hdfs/DefaultOutputChannelManager.java
+0
-154
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageStorage.java
...java/com/dianping/cat/hadoop/hdfs/HdfsMessageStorage.java
+0
-141
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/MessageBucket.java
...main/java/com/dianping/cat/hadoop/hdfs/MessageBucket.java
+0
-29
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/OutputChannel.java
...main/java/com/dianping/cat/hadoop/hdfs/OutputChannel.java
+0
-36
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/OutputChannelManager.java
...va/com/dianping/cat/hadoop/hdfs/OutputChannelManager.java
+0
-15
cat-hadoop/src/main/java/com/dianping/cat/hadoop/job/BrowserAnalyzer.java
...ain/java/com/dianping/cat/hadoop/job/BrowserAnalyzer.java
+0
-79
cat-hadoop/src/main/java/com/dianping/cat/hadoop/mapreduce/MessageTreeInputFormat.java
...dianping/cat/hadoop/mapreduce/MessageTreeInputFormat.java
+0
-25
cat-hadoop/src/main/java/com/dianping/cat/hadoop/mapreduce/MessageTreeReader.java
.../com/dianping/cat/hadoop/mapreduce/MessageTreeReader.java
+0
-187
cat-hadoop/src/main/java/com/dianping/cat/hadoop/mapreduce/MessageTreeWritable.java
...om/dianping/cat/hadoop/mapreduce/MessageTreeWritable.java
+0
-30
cat-hadoop/src/main/resources/META-INF/plexus/components.xml
cat-hadoop/src/main/resources/META-INF/plexus/components.xml
+0
-48
cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/CatTestCase.java
...c/test/java/com/dianping/cat/hadoop/hdfs/CatTestCase.java
+0
-21
cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/HdfsMessageStorageTest.java
.../com/dianping/cat/hadoop/hdfs/HdfsMessageStorageTest.java
+0
-52
未找到文件。
cat-hadoop/pom.xml
已删除
100644 → 0
浏览文件 @
58ca1bcb
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"
>
<parent>
<groupId>
com.dianping.cat
</groupId>
<artifactId>
parent
</artifactId>
<version>
0.1.0
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
cat-hadoop
</artifactId>
<name>
CAT Hadoop Analysis
</name>
<dependencies>
<dependency>
<groupId>
com.dianping.cat
</groupId>
<artifactId>
cat-core
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-core
</artifactId>
<version>
0.20.203.0
</version>
</dependency>
<dependency>
<groupId>
org.codehaus.jackson
</groupId>
<artifactId>
jackson-mapper-asl
</artifactId>
<version>
1.9.4
</version>
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
com.site.maven.plugins
</groupId>
<artifactId>
maven-codegen-plugin
</artifactId>
<version>
1.0.7
</version>
<executions>
<execution>
<id>
generate plexus component descriptor
</id>
<phase>
process-classes
</phase>
<goals>
<goal>
plexus
</goal>
</goals>
<configuration>
<className>
com.dianping.cat.hadoop.plexus.ComponentsConfigurator
</className>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-jar-plugin
</artifactId>
<version>
2.4
</version>
<configuration>
<archive>
<index>
true
</index>
<manifest>
<addClasspath>
true
</addClasspath>
<classpathPrefix>
lib/
</classpathPrefix>
<mainClass>
com.dianping.cat.hadoop.job.BrowserAnalyzer
</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
cat-hadoop/src/main/java/com/dianping/cat/hadoop/HdfsDumpConsumer.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop
;
import
com.dianping.cat.message.spi.MessageConsumer
;
import
com.dianping.cat.message.spi.MessageStorage
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
HdfsDumpConsumer
implements
MessageConsumer
{
public
static
final
String
ID
=
"dump-to-hdfs"
;
@Inject
private
MessageStorage
m_storage
;
@Inject
private
String
m_domain
;
@Override
public
void
consume
(
MessageTree
tree
)
{
if
(
m_domain
==
null
||
m_domain
.
equals
(
tree
.
getDomain
()))
{
m_storage
.
store
(
tree
);
}
}
@Override
public
String
getConsumerId
()
{
return
ID
;
}
@Override
public
String
getDomain
()
{
return
m_domain
;
}
public
void
setDomain
(
String
domain
)
{
m_domain
=
domain
;
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/build/ComponentsConfigurator.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.build
;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.dianping.cat.hadoop.HdfsDumpConsumer
;
import
com.dianping.cat.hadoop.hdfs.OutputChannelManager
;
import
com.dianping.cat.hadoop.hdfs.DefaultOutputChannelManager
;
import
com.dianping.cat.hadoop.hdfs.DefaultOutputChannel
;
import
com.dianping.cat.hadoop.hdfs.HdfsMessageStorage
;
import
com.dianping.cat.hadoop.hdfs.OutputChannel
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageConsumer
;
import
com.dianping.cat.message.spi.MessagePathBuilder
;
import
com.dianping.cat.message.spi.MessageStorage
;
import
com.site.lookup.configuration.AbstractResourceConfigurator
;
import
com.site.lookup.configuration.Component
;
public
class
ComponentsConfigurator
extends
AbstractResourceConfigurator
{
@Override
public
List
<
Component
>
defineComponents
()
{
List
<
Component
>
all
=
new
ArrayList
<
Component
>();
if
(
isEnv
(
"dev"
)
||
property
(
"env"
,
null
)
==
null
)
{
all
.
add
(
C
(
OutputChannel
.
class
,
DefaultOutputChannel
.
class
).
is
(
PER_LOOKUP
)
//
.
req
(
MessageCodec
.
class
,
"plain-text"
)
//
.
config
(
E
(
"maxSize"
).
value
(
String
.
valueOf
(
2
*
1024
*
1024L
))));
all
.
add
(
C
(
OutputChannelManager
.
class
,
DefaultOutputChannelManager
.
class
)
//
.
req
(
MessagePathBuilder
.
class
));
}
else
{
all
.
add
(
C
(
OutputChannel
.
class
,
DefaultOutputChannel
.
class
).
is
(
PER_LOOKUP
)
//
.
req
(
MessageCodec
.
class
,
"plain-text"
)
//
.
config
(
E
(
"maxSize"
).
value
(
String
.
valueOf
(
128
*
1024
*
1024L
))));
all
.
add
(
C
(
OutputChannelManager
.
class
,
DefaultOutputChannelManager
.
class
)
//
.
req
(
MessagePathBuilder
.
class
)
//
.
config
(
E
(
"baseDir"
).
value
(
"data"
),
//
E
(
"serverUri"
).
value
(
"/catlog"
)));
}
all
.
add
(
C
(
MessageStorage
.
class
,
"hdfs"
,
HdfsMessageStorage
.
class
)
//
.
req
(
OutputChannelManager
.
class
));
all
.
add
(
C
(
MessageConsumer
.
class
,
HdfsDumpConsumer
.
ID
,
HdfsDumpConsumer
.
class
)
//
.
req
(
MessageStorage
.
class
,
"hdfs"
));
return
all
;
}
public
static
void
main
(
String
[]
args
)
{
generatePlexusComponentsXmlFile
(
new
ComponentsConfigurator
());
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/DefaultOutputChannel.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
java.io.IOException
;
import
java.io.OutputStream
;
import
org.jboss.netty.buffer.ChannelBuffer
;
import
org.jboss.netty.buffer.ChannelBuffers
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
DefaultOutputChannel
implements
OutputChannel
{
@Inject
private
MessageCodec
m_codec
;
@Inject
private
int
m_maxSize
=
0
;
// 0 means unlimited
@Inject
private
long
m_ttl
=
90
*
1000L
;
// 90 seconds
private
OutputStream
m_out
;
private
int
m_count
;
private
long
m_timestamp
;
@Override
public
void
close
()
{
if
(
m_out
!=
null
)
{
try
{
m_out
.
close
();
m_out
=
null
;
}
catch
(
IOException
e
)
{
// ignore it
}
}
}
@Override
public
void
initialize
(
OutputStream
out
)
{
m_out
=
out
;
m_timestamp
=
System
.
currentTimeMillis
();
}
@Override
public
boolean
isExpired
()
{
long
now
=
System
.
currentTimeMillis
();
return
now
-
m_timestamp
>
m_ttl
;
}
public
void
setMaxSize
(
int
maxSize
)
{
m_maxSize
=
maxSize
;
}
public
void
setTtl
(
long
ttl
)
{
m_ttl
=
ttl
;
}
@Override
public
boolean
write
(
MessageTree
tree
)
throws
IOException
{
ChannelBuffer
buf
=
ChannelBuffers
.
dynamicBuffer
(
8192
);
m_codec
.
encode
(
tree
,
buf
);
int
length
=
buf
.
readInt
();
if
(
m_maxSize
>
0
&&
m_count
+
length
+
1
>
m_maxSize
)
{
// exceed the max size
return
false
;
}
buf
.
getBytes
(
buf
.
readerIndex
(),
m_out
,
length
);
// a blank line used to separate two message trees
m_out
.
write
(
'\n'
);
m_count
+=
length
+
1
;
return
true
;
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/DefaultOutputChannelManager.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
java.io.IOException
;
import
java.io.OutputStream
;
import
java.net.InetAddress
;
import
java.net.URI
;
import
java.net.UnknownHostException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.Path
;
import
org.codehaus.plexus.logging.LogEnabled
;
import
org.codehaus.plexus.logging.Logger
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException
;
import
com.dianping.cat.message.spi.MessagePathBuilder
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.ContainerHolder
;
import
com.site.lookup.annotation.Inject
;
public
class
DefaultOutputChannelManager
extends
ContainerHolder
implements
OutputChannelManager
,
Initializable
,
LogEnabled
{
@Inject
private
MessagePathBuilder
m_builder
;
@Inject
private
String
m_baseDir
=
"target/hdfs"
;
@Inject
private
URI
m_serverUri
;
private
FileSystem
m_fs
;
private
Path
m_basePath
;
private
String
m_ipAddress
;
private
Map
<
String
,
OutputChannel
>
m_channels
=
new
HashMap
<
String
,
OutputChannel
>();
private
Map
<
String
,
Integer
>
m_indexes
=
new
HashMap
<
String
,
Integer
>();
private
Logger
m_logger
;
@Override
public
void
cleanupChannels
()
{
try
{
List
<
String
>
expired
=
new
ArrayList
<
String
>();
for
(
Map
.
Entry
<
String
,
OutputChannel
>
e
:
m_channels
.
entrySet
())
{
if
(
e
.
getValue
().
isExpired
())
{
expired
.
add
(
e
.
getKey
());
}
}
for
(
String
path
:
expired
)
{
OutputChannel
channel
=
m_channels
.
remove
(
path
);
closeChannel
(
channel
);
}
}
catch
(
Exception
e
)
{
m_logger
.
warn
(
"Error when doing cleanup!"
,
e
);
}
}
@Override
public
void
closeAllChannels
()
{
for
(
OutputChannel
channel
:
m_channels
.
values
())
{
closeChannel
(
channel
);
}
}
@Override
public
void
closeChannel
(
OutputChannel
channel
)
{
channel
.
close
();
super
.
release
(
channel
);
}
@Override
public
void
enableLogging
(
Logger
logger
)
{
m_logger
=
logger
;
}
@Override
public
void
initialize
()
throws
InitializationException
{
try
{
Configuration
config
=
new
Configuration
();
FileSystem
fs
;
config
.
setInt
(
"io.file.buffer.size"
,
8192
);
if
(
m_serverUri
==
null
)
{
fs
=
FileSystem
.
getLocal
(
config
);
}
else
{
fs
=
FileSystem
.
get
(
m_serverUri
,
config
);
// TODO Not tested yet
}
m_fs
=
fs
;
m_basePath
=
new
Path
(
m_fs
.
getWorkingDirectory
(),
m_baseDir
);
}
catch
(
Exception
e
)
{
throw
new
InitializationException
(
"Error when getting HDFS file system."
,
e
);
}
try
{
InetAddress
localHost
=
InetAddress
.
getLocalHost
();
m_ipAddress
=
localHost
.
getHostAddress
();
}
catch
(
UnknownHostException
e
)
{
m_logger
.
warn
(
"Unable to get local host!"
,
e
);
}
}
@Override
public
OutputChannel
openChannel
(
MessageTree
tree
,
boolean
forceNew
)
throws
IOException
{
String
path
=
m_builder
.
getHdfsPath
(
tree
,
m_ipAddress
);
OutputChannel
channel
=
m_channels
.
get
(
path
);
if
(
channel
==
null
)
{
Path
file
=
new
Path
(
m_basePath
,
path
+
"-0"
);
OutputStream
out
=
m_fs
.
create
(
file
);
channel
=
lookup
(
OutputChannel
.
class
);
channel
.
initialize
(
out
);
m_indexes
.
put
(
path
,
0
);
m_channels
.
put
(
path
,
channel
);
}
else
if
(
forceNew
)
{
int
index
=
m_indexes
.
get
(
path
);
closeChannel
(
channel
);
m_indexes
.
put
(
path
,
++
index
);
Path
file
=
new
Path
(
m_basePath
,
path
+
"-"
+
index
);
OutputStream
out
=
m_fs
.
create
(
file
);
channel
=
lookup
(
OutputChannel
.
class
);
channel
.
initialize
(
out
);
m_channels
.
put
(
path
,
channel
);
}
return
channel
;
}
public
void
setBaseDir
(
String
baseDir
)
{
m_baseDir
=
baseDir
;
}
public
void
setServerUri
(
String
serverUri
)
{
m_serverUri
=
URI
.
create
(
serverUri
);
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageStorage.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
java.io.IOException
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.TimeUnit
;
import
org.codehaus.plexus.logging.LogEnabled
;
import
org.codehaus.plexus.logging.Logger
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException
;
import
com.dianping.cat.message.spi.MessageStorage
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
HdfsMessageStorage
implements
MessageStorage
,
Initializable
,
Disposable
,
LogEnabled
{
@Inject
private
OutputChannelManager
m_manager
;
private
WriteJob
m_job
;
private
Thread
m_thread
;
private
Logger
m_logger
;
@Override
public
void
dispose
()
{
m_job
.
shutdown
();
try
{
m_thread
.
join
();
}
catch
(
InterruptedException
e
)
{
// ignore it
}
}
@Override
public
void
enableLogging
(
Logger
logger
)
{
m_logger
=
logger
;
}
@Override
public
void
initialize
()
throws
InitializationException
{
m_job
=
new
WriteJob
();
Thread
thread
=
new
Thread
(
m_job
);
thread
.
setName
(
"Storage write Job"
);
thread
.
start
();
m_thread
=
thread
;
}
@Override
public
String
store
(
MessageTree
tree
)
{
m_job
.
append
(
tree
);
// Not available
return
null
;
}
class
WriteJob
implements
Runnable
{
private
BlockingQueue
<
MessageTree
>
m_queue
=
new
LinkedBlockingQueue
<
MessageTree
>();
private
volatile
boolean
m_active
=
true
;
public
void
append
(
MessageTree
tree
)
{
try
{
m_queue
.
offer
(
tree
);
}
catch
(
Exception
e
)
{
m_logger
.
warn
(
"Error when adding job to queue."
,
e
);
}
}
private
void
handle
(
MessageTree
tree
)
{
try
{
OutputChannel
channel
=
m_manager
.
openChannel
(
tree
,
false
);
boolean
success
=
channel
.
write
(
tree
);
if
(!
success
)
{
m_manager
.
closeChannel
(
channel
);
channel
=
m_manager
.
openChannel
(
tree
,
true
);
channel
.
write
(
tree
);
}
}
catch
(
IOException
e
)
{
m_logger
.
error
(
"Error when writing to HDFS!"
,
e
);
}
}
private
boolean
isActive
()
{
synchronized
(
this
)
{
return
m_active
;
}
}
@Override
public
void
run
()
{
long
lastCheckedTime
=
System
.
currentTimeMillis
();
try
{
while
(
isActive
())
{
MessageTree
tree
=
m_queue
.
poll
(
1000
*
1000L
,
TimeUnit
.
NANOSECONDS
);
if
(
tree
!=
null
)
{
handle
(
tree
);
}
// check connection timeout and close it
if
(
System
.
currentTimeMillis
()
-
lastCheckedTime
>=
5
*
1000
)
{
lastCheckedTime
=
System
.
currentTimeMillis
();
m_manager
.
cleanupChannels
();
}
}
// process the remaining job in the queue
while
(!
isActive
())
{
MessageTree
tree
=
m_queue
.
poll
();
if
(
tree
!=
null
)
{
handle
(
tree
);
}
else
{
break
;
}
}
}
catch
(
Exception
e
)
{
m_logger
.
warn
(
"Error when dumping message to HDFS."
,
e
);
}
m_manager
.
closeAllChannels
();
}
public
void
shutdown
()
{
synchronized
(
this
)
{
m_active
=
false
;
}
}
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/MessageBucket.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
java.util.List
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.storage.Bucket
;
/**
* Map to one HDFS directory for one report
*/
public
interface
MessageBucket
extends
Bucket
<
MessageTree
>
{
public
void
close
();
public
boolean
storeById
(
String
id
,
MessageTree
value
,
String
...
tags
);
public
List
<
String
>
findAllIdsByTag
(
String
tag
);
public
MessageTree
findNextById
(
String
id
,
Direction
direction
,
String
tag
);
// tag:
// "thread:101",
// "session:abc",
// "request:xyz",
// "parent:xxx"
public
static
enum
Direction
{
FORWARD
,
BACKWARD
;
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/OutputChannel.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
java.io.IOException
;
import
java.io.OutputStream
;
import
com.dianping.cat.message.spi.MessageTree
;
public
interface
OutputChannel
{
/**
* Close the channel.
*/
public
void
close
();
/**
* Initialize the channel with an output stream.
*
* @param out
*/
public
void
initialize
(
OutputStream
out
);
/**
* Check if the channel is expired.
*
* @return true if the channel is expired, false otherwise.
*/
public
boolean
isExpired
();
/**
* Output the message tree to the HDFS.
*
* @param tree
* @return false if the max size is reached, false otherwise.
* @throws IOException
*/
public
boolean
write
(
MessageTree
tree
)
throws
IOException
;
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/OutputChannelManager.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
java.io.IOException
;
import
com.dianping.cat.message.spi.MessageTree
;
public
interface
OutputChannelManager
{
public
void
cleanupChannels
();
public
void
closeAllChannels
();
public
void
closeChannel
(
OutputChannel
channel
);
public
OutputChannel
openChannel
(
MessageTree
tree
,
boolean
forceNew
)
throws
IOException
;
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/job/BrowserAnalyzer.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.job
;
import
java.io.File
;
import
java.io.IOException
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.conf.Configured
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.io.IntWritable
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapreduce.Job
;
import
org.apache.hadoop.mapreduce.Mapper
;
import
org.apache.hadoop.mapreduce.Reducer
;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
;
import
org.apache.hadoop.util.Tool
;
import
org.apache.hadoop.util.ToolRunner
;
import
com.dianping.cat.hadoop.mapreduce.MessageTreeInputFormat
;
import
com.dianping.cat.hadoop.mapreduce.MessageTreeWritable
;
import
com.dianping.cat.message.Message
;
import
com.site.helper.Files
;
public
class
BrowserAnalyzer
extends
Configured
implements
Tool
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
int
exitCode
=
ToolRunner
.
run
(
new
Configuration
(),
new
BrowserAnalyzer
(),
args
);
System
.
exit
(
exitCode
);
}
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
getConf
();
Job
job
=
new
Job
(
conf
,
"browser analyzer"
);
job
.
setJarByClass
(
BrowserAnalyzer
.
class
);
job
.
setMapperClass
(
TokenizerMapper
.
class
);
job
.
setCombinerClass
(
IntSumReducer
.
class
);
job
.
setReducerClass
(
IntSumReducer
.
class
);
job
.
setInputFormatClass
(
MessageTreeInputFormat
.
class
);
job
.
setOutputKeyClass
(
Text
.
class
);
job
.
setOutputValueClass
(
IntWritable
.
class
);
FileInputFormat
.
addInputPath
(
job
,
new
Path
(
"target/hdfs/20120215/17/null"
));
FileOutputFormat
.
setOutputPath
(
job
,
new
Path
(
"target/browser"
));
Files
.
forDir
().
delete
(
new
File
(
"target/browser"
),
true
);
return
job
.
waitForCompletion
(
true
)
?
0
:
1
;
}
public
static
class
IntSumReducer
extends
Reducer
<
Text
,
IntWritable
,
Text
,
IntWritable
>
{
private
IntWritable
result
=
new
IntWritable
();
public
void
reduce
(
Text
key
,
Iterable
<
IntWritable
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
int
sum
=
0
;
for
(
IntWritable
val
:
values
)
{
sum
+=
val
.
get
();
}
result
.
set
(
sum
);
context
.
write
(
key
,
result
);
}
}
public
static
class
TokenizerMapper
extends
Mapper
<
Object
,
MessageTreeWritable
,
Text
,
IntWritable
>
{
private
final
static
IntWritable
ONE
=
new
IntWritable
(
1
);
private
Text
m_word
=
new
Text
();
public
void
map
(
Object
key
,
MessageTreeWritable
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
Message
message
=
value
.
get
().
getMessage
();
m_word
.
set
(
message
.
getType
()
+
"."
+
message
.
getName
());
context
.
write
(
m_word
,
ONE
);
}
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/mapreduce/MessageTreeInputFormat.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.mapreduce
;
import
java.io.IOException
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.io.LongWritable
;
import
org.apache.hadoop.mapreduce.InputSplit
;
import
org.apache.hadoop.mapreduce.JobContext
;
import
org.apache.hadoop.mapreduce.RecordReader
;
import
org.apache.hadoop.mapreduce.TaskAttemptContext
;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
;
public
class
MessageTreeInputFormat
extends
FileInputFormat
<
LongWritable
,
MessageTreeWritable
>
{
@Override
public
RecordReader
<
LongWritable
,
MessageTreeWritable
>
createRecordReader
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
return
new
MessageTreeReader
();
}
@Override
protected
boolean
isSplitable
(
JobContext
context
,
Path
filename
)
{
// the file is already small enough, so do not need to split it
return
false
;
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/mapreduce/MessageTreeReader.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.mapreduce
;
import
java.io.BufferedInputStream
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.fs.FSDataInputStream
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.io.LongWritable
;
import
org.apache.hadoop.io.compress.CompressionCodec
;
import
org.apache.hadoop.io.compress.CompressionCodecFactory
;
import
org.apache.hadoop.mapreduce.InputSplit
;
import
org.apache.hadoop.mapreduce.RecordReader
;
import
org.apache.hadoop.mapreduce.TaskAttemptContext
;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit
;
import
org.jboss.netty.buffer.ChannelBuffer
;
import
org.jboss.netty.buffer.ChannelBuffers
;
import
com.dianping.cat.message.spi.codec.EscapingBufferWriter
;
import
com.dianping.cat.message.spi.codec.PlainTextMessageCodec
;
public
class
MessageTreeReader
extends
RecordReader
<
LongWritable
,
MessageTreeWritable
>
{
private
CompressionCodecFactory
m_compressionCodecs
;
private
long
m_start
;
private
long
m_pos
;
private
long
m_end
;
private
BlockReader
m_in
;
private
LongWritable
m_key
;
private
MessageTreeWritable
m_value
;
@Override
public
void
close
()
throws
IOException
{
if
(
m_in
!=
null
)
{
m_in
.
close
();
}
}
@Override
public
LongWritable
getCurrentKey
()
throws
IOException
,
InterruptedException
{
return
m_key
;
}
@Override
public
MessageTreeWritable
getCurrentValue
()
throws
IOException
,
InterruptedException
{
return
m_value
;
}
@Override
public
float
getProgress
()
throws
IOException
,
InterruptedException
{
if
(
m_start
==
m_end
)
{
return
0
;
}
else
{
return
Math
.
min
(
1.0f
,
(
m_pos
-
m_start
)
/
(
float
)
(
m_end
-
m_start
));
}
}
public
void
initialize
(
InputSplit
genericSplit
,
TaskAttemptContext
context
)
throws
IOException
{
FileSplit
split
=
(
FileSplit
)
genericSplit
;
Configuration
config
=
context
.
getConfiguration
();
m_start
=
split
.
getStart
();
m_end
=
m_start
+
split
.
getLength
();
m_compressionCodecs
=
new
CompressionCodecFactory
(
config
);
// open the file and seek to the start of the split
Path
file
=
split
.
getPath
();
CompressionCodec
codec
=
m_compressionCodecs
.
getCodec
(
file
);
FileSystem
fs
=
file
.
getFileSystem
(
config
);
FSDataInputStream
fileIn
=
fs
.
open
(
split
.
getPath
());
boolean
skipFirstLine
=
false
;
if
(
codec
!=
null
)
{
m_in
=
new
BlockReader
(
codec
.
createInputStream
(
fileIn
),
config
);
m_end
=
Long
.
MAX_VALUE
;
}
else
{
if
(
m_start
!=
0
)
{
skipFirstLine
=
true
;
--
m_start
;
fileIn
.
seek
(
m_start
);
}
m_in
=
new
BlockReader
(
fileIn
,
config
);
}
if
(
skipFirstLine
)
{
// skip first line and re-establish "start".
m_start
+=
m_in
.
readBlock
(
new
MessageTreeWritable
());
}
m_pos
=
m_start
;
}
@Override
public
boolean
nextKeyValue
()
throws
IOException
,
InterruptedException
{
if
(
m_key
==
null
)
{
m_key
=
new
LongWritable
();
}
m_key
.
set
(
m_pos
);
if
(
m_value
==
null
)
{
m_value
=
new
MessageTreeWritable
();
}
int
blockSize
=
0
;
if
(
m_pos
<
m_end
)
{
blockSize
=
m_in
.
readBlock
(
m_value
);
m_pos
+=
blockSize
;
}
if
(
blockSize
==
0
)
{
m_key
=
null
;
m_value
=
null
;
return
false
;
}
else
{
return
true
;
}
}
static
class
BlockReader
{
private
BufferedInputStream
m_in
;
private
PlainTextMessageCodec
m_codec
;
public
BlockReader
(
InputStream
in
,
Configuration
config
)
{
int
bufferSize
=
config
.
getInt
(
"io.file.buffer.size"
,
8192
);
m_in
=
new
BufferedInputStream
(
in
,
bufferSize
);
m_codec
=
new
PlainTextMessageCodec
();
m_codec
.
setBufferWriter
(
new
EscapingBufferWriter
());
}
public
void
close
()
throws
IOException
{
m_in
.
close
();
}
public
int
readBlock
(
MessageTreeWritable
tree
)
throws
IOException
{
ChannelBuffer
buf
=
ChannelBuffers
.
dynamicBuffer
(
8192
);
byte
[]
data
=
new
byte
[
2048
];
byte
prev
=
0
;
int
count
=
0
;
m_in
.
mark
(
Integer
.
MAX_VALUE
);
int
size
=
m_in
.
read
(
data
);
int
pos
=
0
;
while
(
size
>=
0
)
{
if
(
pos
>=
size
)
{
buf
.
writeBytes
(
data
,
0
,
size
);
count
+=
size
;
m_in
.
mark
(
Integer
.
MAX_VALUE
);
size
=
m_in
.
read
(
data
);
pos
=
0
;
if
(
size
<
0
)
{
break
;
}
}
byte
b
=
data
[
pos
++];
if
(
b
==
'\n'
&&
prev
==
'\n'
)
{
buf
.
writeBytes
(
data
,
0
,
pos
-
1
);
count
+=
pos
;
m_in
.
reset
();
m_in
.
skip
(
pos
);
break
;
}
prev
=
b
;
}
m_codec
.
decode
(
buf
,
tree
.
get
());
return
count
;
}
}
}
cat-hadoop/src/main/java/com/dianping/cat/hadoop/mapreduce/MessageTreeWritable.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.mapreduce
;
import
java.io.DataInput
;
import
java.io.DataOutput
;
import
java.io.IOException
;
import
org.apache.hadoop.io.Writable
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.internal.DefaultMessageTree
;
public
class
MessageTreeWritable
implements
Writable
{
private
MessageTree
m_tree
=
new
DefaultMessageTree
();
public
MessageTree
get
()
{
return
m_tree
;
}
@Override
public
void
readFields
(
DataInput
in
)
throws
IOException
{
throw
new
UnsupportedOperationException
(
"This method should never be called, please check with the author if any problem."
);
}
@Override
public
void
write
(
DataOutput
out
)
throws
IOException
{
throw
new
UnsupportedOperationException
(
"This method should never be called, please check with the author if any problem."
);
}
}
cat-hadoop/src/main/resources/META-INF/plexus/components.xml
已删除
100644 → 0
浏览文件 @
58ca1bcb
<plexus>
<components>
<component>
<role>
com.dianping.cat.hadoop.hdfs.OutputChannel
</role>
<implementation>
com.dianping.cat.hadoop.hdfs.DefaultOutputChannel
</implementation>
<instantiation-strategy>
per-lookup
</instantiation-strategy>
<configuration>
<maxSize>
2097152
</maxSize>
</configuration>
<requirements>
<requirement>
<role>
com.dianping.cat.message.spi.MessageCodec
</role>
<role-hint>
plain-text
</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>
com.dianping.cat.hadoop.hdfs.OutputChannelManager
</role>
<implementation>
com.dianping.cat.hadoop.hdfs.DefaultOutputChannelManager
</implementation>
<requirements>
<requirement>
<role>
com.dianping.cat.message.spi.MessagePathBuilder
</role>
</requirement>
</requirements>
</component>
<component>
<role>
com.dianping.cat.message.spi.MessageStorage
</role>
<role-hint>
hdfs
</role-hint>
<implementation>
com.dianping.cat.hadoop.hdfs.HdfsMessageStorage
</implementation>
<requirements>
<requirement>
<role>
com.dianping.cat.hadoop.hdfs.OutputChannelManager
</role>
</requirement>
</requirements>
</component>
<component>
<role>
com.dianping.cat.message.spi.MessageConsumer
</role>
<role-hint>
dump-to-hdfs
</role-hint>
<implementation>
com.dianping.cat.hadoop.HdfsDumpConsumer
</implementation>
<requirements>
<requirement>
<role>
com.dianping.cat.message.spi.MessageStorage
</role>
<role-hint>
hdfs
</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/CatTestCase.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
org.junit.After
;
import
org.junit.Before
;
import
com.dianping.cat.Cat
;
import
com.site.lookup.ComponentTestCase
;
public
abstract
class
CatTestCase
extends
ComponentTestCase
{
@Before
public
void
before
()
throws
Exception
{
Cat
.
initialize
(
getContainer
(),
null
);
Cat
.
setup
(
null
,
null
);
}
@After
public
void
after
()
throws
Exception
{
Cat
.
reset
();
Cat
.
destroy
();
}
}
\ No newline at end of file
cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/HdfsMessageStorageTest.java
已删除
100644 → 0
浏览文件 @
58ca1bcb
package
com.dianping.cat.hadoop.hdfs
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.junit.runners.JUnit4
;
import
com.dianping.cat.hadoop.hdfs.HdfsMessageStorage
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.MessageProducer
;
import
com.dianping.cat.message.Transaction
;
import
com.dianping.cat.message.io.InMemoryQueue
;
import
com.dianping.cat.message.spi.MessageStorage
;
import
com.dianping.cat.message.spi.MessageTree
;
@RunWith
(
JUnit4
.
class
)
public
class
HdfsMessageStorageTest
extends
CatTestCase
{
@Test
public
void
test
()
throws
Exception
{
MessageStorage
storage
=
lookup
(
MessageStorage
.
class
,
"hdfs"
);
MessageProducer
producer
=
lookup
(
MessageProducer
.
class
);
InMemoryQueue
queue
=
lookup
(
InMemoryQueue
.
class
);
for
(
int
i
=
0
;
i
<
10000
;
i
++)
{
Transaction
t
=
producer
.
newTransaction
(
"URL"
,
"MyPage"
+
(
int
)
(
i
/
500
));
try
{
// do your business here
t
.
addData
(
"k1"
,
"v1"
);
t
.
addData
(
"k2"
,
"v2"
);
t
.
addData
(
"k3"
,
"v3"
);
Thread
.
sleep
(
1
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
t
.
setStatus
(
Message
.
SUCCESS
);
}
catch
(
Exception
e
)
{
t
.
setStatus
(
e
);
}
finally
{
t
.
complete
();
}
MessageTree
tree
=
queue
.
poll
(
0
);
storage
.
store
(
tree
);
}
((
HdfsMessageStorage
)
storage
).
dispose
();
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录