Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦中观雨
cat
提交
d7c3d513
C
cat
项目概览
梦中观雨
/
cat
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
cat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d7c3d513
编写于
3月 09, 2012
作者:
Y
You Yong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
modity the hadoop sql job
上级
3136ea10
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
797 addition
and
137 deletion
+797
-137
cat-job/datasources.xml
cat-job/datasources.xml
+17
-0
cat-job/pom.xml
cat-job/pom.xml
+15
-0
cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java
...va/com/dianping/cat/job/build/ComponentsConfigurator.java
+2
-0
cat-job/src/main/java/com/dianping/cat/job/build/DatabaseConfigurator.java
...java/com/dianping/cat/job/build/DatabaseConfigurator.java
+23
-0
cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java
...com/dianping/cat/job/hdfs/DefaultInputChannelManager.java
+0
-2
cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java
...in/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java
+3
-4
cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java
...om/dianping/cat/job/mapreduce/MessageTreeInputFormat.java
+2
-2
cat-job/src/main/java/com/dianping/cat/job/sql/DirectoryInputFormat.java
...n/java/com/dianping/cat/job/sql/DirectoryInputFormat.java
+35
-30
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java
...ob/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java
+78
-12
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java
.../src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java
+4
-3
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobReducer.java
...src/main/java/com/dianping/cat/job/sql/SqlJobReducer.java
+1
-1
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java
.../src/main/java/com/dianping/cat/job/sql/SqlJobResult.java
+55
-28
cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java
...c/main/java/com/dianping/cat/job/sql/SqlStatementKey.java
+30
-4
cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java
...main/java/com/dianping/cat/job/sql/SqlStatementValue.java
+12
-3
cat-job/src/main/java/com/dianping/cat/job/sql/database/ContainerBootstrap.java
...com/dianping/cat/job/sql/database/ContainerBootstrap.java
+26
-0
cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobMapper.java
...com/dianping/cat/job/sql/database/SqlRecordJobMapper.java
+13
-0
cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobReducer.java
...om/dianping/cat/job/sql/database/SqlRecordJobReducer.java
+48
-0
cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlReportRecord.java
...va/com/dianping/cat/job/sql/database/SqlReportRecord.java
+210
-0
cat-job/src/main/resources/META-INF/dal/jdbc/codegen.xml
cat-job/src/main/resources/META-INF/dal/jdbc/codegen.xml
+46
-0
cat-job/src/main/resources/META-INF/dal/jdbc/dal.xml
cat-job/src/main/resources/META-INF/dal/jdbc/dal.xml
+17
-0
cat-job/src/main/resources/META-INF/dal/jdbc/manifest.xml
cat-job/src/main/resources/META-INF/dal/jdbc/manifest.xml
+6
-0
cat-job/src/main/resources/META-INF/plexus/components.xml
cat-job/src/main/resources/META-INF/plexus/components.xml
+50
-0
cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java
.../java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java
+62
-47
cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java
...src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java
+41
-0
pom.xml
pom.xml
+1
-1
未找到文件。
cat-job/datasources.xml
0 → 100644
浏览文件 @
d7c3d513
<?xml version="1.0" encoding="utf-8"?>
<data-sources>
<data-source
id=
"cat"
>
<maximum-pool-size>
3
</maximum-pool-size>
<connection-timeout>
1s
</connection-timeout>
<idle-timeout>
10m
</idle-timeout>
<statement-cache-size>
1000
</statement-cache-size>
<properties>
<driver>
com.mysql.jdbc.Driver
</driver>
<url>
<![CDATA[jdbc:mysql://localhost:3306/cat]]>
</url>
<user>
root
</user>
<password>
123456
</password>
<connectionProperties>
<![CDATA[useUnicode=true&autoReconnect=true]]>
</connectionProperties>
</properties>
</data-source>
</data-sources>
cat-job/pom.xml
浏览文件 @
d7c3d513
...
...
@@ -14,6 +14,14 @@
<groupId>
com.dianping.cat
</groupId>
<artifactId>
cat-core
</artifactId>
</dependency>
<dependency>
<groupId>
com.site.dal
</groupId>
<artifactId>
dal-jdbc
</artifactId>
</dependency>
<dependency>
<groupId>
mysql
</groupId>
<artifactId>
mysql-connector-java
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-core
</artifactId>
...
...
@@ -47,6 +55,13 @@
<artifactId>
maven-codegen-plugin
</artifactId>
<version>
1.0.10
</version>
<executions>
<execution>
<id>
generate dal jdbc model
</id>
<phase>
generate-sources
</phase>
<goals>
<goal>
dal-jdbc
</goal>
</goals>
</execution>
<execution>
<id>
generate plexus component descriptor
</id>
<phase>
process-classes
</phase>
...
...
cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java
浏览文件 @
d7c3d513
...
...
@@ -42,6 +42,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all
.
add
(
C
(
MessageConsumer
.
class
,
HdfsDumpConsumer
.
ID
,
HdfsDumpConsumer
.
class
)
//
.
req
(
MessageStorage
.
class
,
"hdfs"
));
all
.
addAll
(
new
DatabaseConfigurator
().
defineComponents
());
return
all
;
}
...
...
cat-job/src/main/java/com/dianping/cat/job/build/DatabaseConfigurator.java
0 → 100644
浏览文件 @
d7c3d513
package
com.dianping.cat.job.build
;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.dianping.cat.job.sql.dal._INDEX
;
import
com.site.dal.jdbc.configuration.AbstractJdbcResourceConfigurator
;
import
com.site.lookup.configuration.Component
;
final
class
DatabaseConfigurator
extends
AbstractJdbcResourceConfigurator
{
public
List
<
Component
>
defineComponents
()
{
List
<
Component
>
all
=
new
ArrayList
<
Component
>();
all
.
add
(
defineJdbcDataSourceConfigurationManagerComponent
(
"datasources.xml"
));
all
.
add
(
defineJdbcDataSourceComponent
(
"cat"
,
"${jdbc.driver}"
,
"${jdbc.url}"
,
"${jdbc.user}"
,
"${jdbc.password}"
,
"<![CDATA[${jdbc.connectionProperties}]]>"
));
defineSimpleTableProviderComponents
(
all
,
"cat"
,
_INDEX
.
getEntityClasses
());
defineDaoComponents
(
all
,
_INDEX
.
getDaoClasses
());
return
all
;
}
}
\ No newline at end of file
cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java
浏览文件 @
d7c3d513
package
com.dianping.cat.job.hdfs
;
import
java.io.IOException
;
import
java.io.OutputStream
;
import
java.net.InetAddress
;
import
java.net.URI
;
import
java.net.UnknownHostException
;
...
...
@@ -20,7 +19,6 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException
;
import
com.dianping.cat.message.spi.MessagePathBuilder
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.ContainerHolder
;
import
com.site.lookup.annotation.Inject
;
...
...
cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java
浏览文件 @
d7c3d513
...
...
@@ -144,10 +144,9 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos
}
}
}
@Override
public
MessageTree
get
(
String
messageId
)
{
// TODO Auto-generated method stub
public
MessageTree
get
(
String
messageId
){
//TODO
return
null
;
}
}
cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java
浏览文件 @
d7c3d513
...
...
@@ -9,9 +9,9 @@ import org.apache.hadoop.mapreduce.JobContext;
import
org.apache.hadoop.mapreduce.RecordReader
;
import
org.apache.hadoop.mapreduce.TaskAttemptContext
;
import
com.dianping.cat.job.sql.
MutiFile
InputFormat
;
import
com.dianping.cat.job.sql.
Directory
InputFormat
;
public
class
MessageTreeInputFormat
extends
MutiFile
InputFormat
<
LongWritable
,
MessageTreeWritable
>
{
public
class
MessageTreeInputFormat
extends
Directory
InputFormat
<
LongWritable
,
MessageTreeWritable
>
{
@Override
public
RecordReader
<
LongWritable
,
MessageTreeWritable
>
createRecordReader
(
InputSplit
split
,
TaskAttemptContext
context
)
throws
IOException
,
InterruptedException
{
...
...
cat-job/src/main/java/com/dianping/cat/job/sql/
MutiFile
InputFormat.java
→
cat-job/src/main/java/com/dianping/cat/job/sql/
Directory
InputFormat.java
浏览文件 @
d7c3d513
...
...
@@ -13,11 +13,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.InvalidInputException
;
import
org.apache.hadoop.mapreduce.security.TokenCache
;
public
abstract
class
DirectoryInputFormat
<
K
,
V
>
extends
FileInputFormat
<
K
,
V
>
{
public
abstract
class
MutiFileInputFormat
<
K
,
V
>
extends
FileInputFormat
<
K
,
V
>
{
public
List
<
FileStatus
>
listStatus
(
JobContext
job
)
throws
IOException
{
List
<
FileStatus
>
result
=
new
ArrayList
<
FileStatus
>();
Path
[]
dirs
=
getInputPaths
(
job
);
if
(
dirs
.
length
==
0
)
{
...
...
@@ -32,9 +31,10 @@ public abstract class MutiFileInputFormat<K,V> extends FileInputFormat<K, V> {
if
(
jobFilter
!=
null
)
{
filters
.
add
(
jobFilter
);
}
//Add Default Hidden file
//
Add Default Hidden file
PathFilter
inputFilter
=
new
MultiPathFilter
(
filters
);
filters
.
add
(
hiddenFileFilter
);
for
(
int
i
=
0
;
i
<
dirs
.
length
;
++
i
)
{
Path
p
=
dirs
[
i
];
FileSystem
fs
=
p
.
getFileSystem
(
job
.
getConfiguration
());
...
...
@@ -44,7 +44,7 @@ public abstract class MutiFileInputFormat<K,V> extends FileInputFormat<K, V> {
}
else
if
(
matches
.
length
==
0
)
{
errors
.
add
(
new
IOException
(
"Input Pattern "
+
p
+
" matches 0 files"
));
}
else
{
for
(
FileStatus
globStat
:
matches
)
{
addFileStat
(
result
,
inputFilter
,
fs
,
globStat
);
}
...
...
@@ -58,32 +58,37 @@ public abstract class MutiFileInputFormat<K,V> extends FileInputFormat<K, V> {
}
public
void
addFileStat
(
List
<
FileStatus
>
result
,
PathFilter
inputFilter
,
FileSystem
fs
,
FileStatus
globStat
)
throws
IOException
{
if
(
globStat
.
isDir
())
{
for
(
FileStatus
stat
:
fs
.
listStatus
(
globStat
.
getPath
(),
inputFilter
))
{
addFileStat
(
result
,
inputFilter
,
fs
,
stat
);
}
}
else
{
result
.
add
(
globStat
);
System
.
out
.
println
(
globStat
.
getPath
().
getName
());
}
}
throws
IOException
{
if
(
globStat
.
isDir
())
{
for
(
FileStatus
stat
:
fs
.
listStatus
(
globStat
.
getPath
(),
inputFilter
))
{
addFileStat
(
result
,
inputFilter
,
fs
,
stat
);
}
}
else
{
result
.
add
(
globStat
);
}
}
private
static
final
PathFilter
hiddenFileFilter
=
new
PathFilter
()
{
public
boolean
accept
(
Path
p
)
{
String
name
=
p
.
getName
();
return
!
name
.
startsWith
(
"_"
)
&&
!
name
.
startsWith
(
"."
);
}
};
private
static
class
MultiPathFilter
implements
PathFilter
{
private
List
<
PathFilter
>
filters
;
private
List
<
PathFilter
>
filters
;
public
MultiPathFilter
(
List
<
PathFilter
>
filters
)
{
this
.
filters
=
filters
;
}
public
MultiPathFilter
(
List
<
PathFilter
>
filters
)
{
this
.
filters
=
filters
;
}
public
boolean
accept
(
Path
path
)
{
for
(
PathFilter
filter
:
filters
)
{
if
(!
filter
.
accept
(
path
))
{
return
false
;
}
}
return
true
;
}
}
public
boolean
accept
(
Path
path
)
{
for
(
PathFilter
filter
:
filters
)
{
if
(!
filter
.
accept
(
path
))
{
return
false
;
}
}
return
true
;
}
}
}
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java
浏览文件 @
d7c3d513
...
...
@@ -7,6 +7,8 @@ import java.util.Date;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.conf.Configured
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.io.IntWritable
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapreduce.Job
;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
;
...
...
@@ -14,16 +16,39 @@ import org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner
;
import
com.dianping.cat.job.mapreduce.MessageTreeInputFormat
;
import
com.dianping.cat.job.sql.database.SqlRecordJobMapper
;
import
com.dianping.cat.job.sql.database.SqlRecordJobReducer
;
import
com.site.helper.Files
;
public
class
SqlJobMain
extends
Configured
implements
Tool
{
private
static
final
String
DEFAUL_IN_PATH
=
"target/hdfs/"
;
private
static
final
String
DEFAUL_OUT_PATH
=
"target/cat/sql/"
;
private
static
final
String
DEFAULT_FINAL_PATH
=
"target/cat/result/"
;
private
static
final
int
DEFAULT_REDUCE_NUMBER
=
3
;
/**
* The job process last hour data when no args default. The args[0] can set
* the number of reduce; The args[1] is for input path
*/
public
static
void
main
(
String
[]
args
)
throws
Exception
{
int
exitCode
=
ToolRunner
.
run
(
new
Configuration
(),
new
SqlJobMain
(),
args
);
System
.
exit
(
exitCode
);
}
private
String
getLastHoursString
(
int
hours
)
{
Date
date
=
new
Date
();
long
lastHour
=
date
.
getTime
();
lastHour
=
lastHour
-
lastHour
%
(
60
*
60
*
1000
)
-
60
*
60
*
1000
*
hours
;
date
.
setTime
(
lastHour
);
return
new
SimpleDateFormat
(
"yyyyMMdd/HH/"
).
format
(
date
);
}
@Override
public
int
run
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
getConf
();
...
...
@@ -36,19 +61,60 @@ public class SqlJobMain extends Configured implements Tool {
job
.
setOutputKeyClass
(
SqlStatementKey
.
class
);
job
.
setOutputValueClass
(
SqlJobResult
.
class
);
job
.
setPartitionerClass
(
SqlJobPatitioner
.
class
);
job
.
setMapOutputKeyClass
(
SqlStatementKey
.
class
);
job
.
setMapOutputValueClass
(
SqlStatementValue
.
class
);
job
.
setNumReduceTasks
(
2
);
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyyMMdd/HH/"
);
String
dateStr
=
sdf
.
format
(
new
Date
());
String
path
=
"target/hdfs/20120306/23/null/"
;
FileInputFormat
.
addInputPath
(
job
,
new
Path
(
path
));
FileOutputFormat
.
setOutputPath
(
job
,
new
Path
(
"target/sql"
));
Files
.
forDir
().
delete
(
new
File
(
"target/sql"
),
true
);
job
.
setNumReduceTasks
(
DEFAULT_REDUCE_NUMBER
);
if
(
args
.
length
>
0
)
{
try
{
job
.
setNumReduceTasks
(
Integer
.
parseInt
(
args
[
0
]));
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"The input args of the job is not correct, the args[0] should be integer!"
);
return
0
;
}
}
String
hourStr
=
getLastHoursString
(
1
);
String
inputPath
=
DEFAUL_IN_PATH
+
hourStr
;
String
outputPath
=
DEFAUL_OUT_PATH
+
hourStr
;
if
(
args
.
length
>
1
)
{
if
(
args
.
length
>=
2
)
{
inputPath
=
args
[
1
];
}
}
System
.
out
.
println
(
String
.
format
(
"InputPath: %s,OutPath %s"
,
inputPath
,
outputPath
));
FileInputFormat
.
addInputPath
(
job
,
new
Path
(
inputPath
));
FileOutputFormat
.
setOutputPath
(
job
,
new
Path
(
outputPath
));
Files
.
forDir
().
delete
(
new
File
(
outputPath
),
true
);
if
(
job
.
waitForCompletion
(
true
))
{
return
runSqlRecordJob
(
hourStr
);
}
else
{
return
0
;
}
//String hourStr = getLastHoursString(1);
//return runSqlRecordJob(hourStr);
}
/*
* insert the result to mysql
*/
private
int
runSqlRecordJob
(
String
currentHour
)
throws
Exception
{
Configuration
conf
=
getConf
();
conf
.
set
(
"JobHour"
,
currentHour
);
Job
job
=
new
Job
(
conf
,
"Sql Record"
);
job
.
setJarByClass
(
SqlJobMain
.
class
);
job
.
setMapperClass
(
SqlRecordJobMapper
.
class
);
job
.
setReducerClass
(
SqlRecordJobReducer
.
class
);
job
.
setMapOutputKeyClass
(
Text
.
class
);
job
.
setMapOutputValueClass
(
Text
.
class
);
FileInputFormat
.
addInputPath
(
job
,
new
Path
(
DEFAUL_OUT_PATH
+
currentHour
));
FileOutputFormat
.
setOutputPath
(
job
,
new
Path
(
DEFAULT_FINAL_PATH
));
Files
.
forDir
().
delete
(
new
File
(
DEFAULT_FINAL_PATH
),
true
);
return
job
.
waitForCompletion
(
true
)
?
0
:
1
;
}
}
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java
浏览文件 @
d7c3d513
...
...
@@ -40,15 +40,16 @@ public class SqlJobMapper extends Mapper<Object, MessageTreeWritable, SqlStateme
if
(
type
.
equals
(
"SQL"
))
{
SqlStatementKey
statementKey
=
new
SqlStatementKey
();
String
statement
=
transaction
.
getName
();
String
name
=
transaction
.
getName
();
String
statement
=
transaction
.
getData
().
toString
();
long
duration
=
transaction
.
getDuration
();
int
flag
=
0
;
statementKey
.
setDomain
(
new
Text
(
domain
)).
setStatement
(
new
Text
(
statement
));
statementKey
.
setDomain
(
new
Text
(
domain
)).
set
Name
(
new
Text
(
name
)).
set
Statement
(
new
Text
(
statement
));
if
(!
transaction
.
getStatus
().
equals
(
Transaction
.
SUCCESS
))
{
flag
=
1
;
}
SqlStatementValue
result
=
new
SqlStatementValue
(
flag
,
duration
);
SqlStatementValue
result
=
new
SqlStatementValue
(
flag
,
duration
,
tree
.
getMessageId
()
);
context
.
write
(
statementKey
,
result
);
}
...
...
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobReducer.java
浏览文件 @
d7c3d513
...
...
@@ -10,7 +10,7 @@ public class SqlJobReducer extends Reducer<SqlStatementKey, SqlStatementValue, S
SqlJobResult
result
=
new
SqlJobResult
();
for
(
SqlStatementValue
val
:
values
)
{
result
.
add
(
val
.
getValue
(),
val
.
getFlag
());
result
.
add
(
val
.
getValue
(),
val
.
getFlag
()
,
val
.
getSampleUrl
()
);
}
context
.
write
(
key
,
result
);
}
...
...
cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java
浏览文件 @
d7c3d513
...
...
@@ -3,6 +3,10 @@ package com.dianping.cat.job.sql;
import
java.io.DataInput
;
import
java.io.DataOutput
;
import
java.io.IOException
;
import
java.text.DecimalFormat
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.List
;
import
org.apache.hadoop.io.Writable
;
...
...
@@ -10,51 +14,61 @@ public class SqlJobResult implements Writable {
private
static
final
double
LONG_TIME
=
50
;
private
int
m_count
;
private
static
final
char
SPIT
=
'\t'
;
private
double
m_sum
;
private
List
<
Double
>
m_durations
=
new
ArrayList
<
Double
>()
;
private
double
m_sum2
;
private
int
m_successCount
;
private
int
m_failureCount
;
private
int
m_longTimeCount
;
public
String
toString
()
{
StringBuffer
sb
=
new
StringBuffer
();
sb
.
append
(
"Count: "
).
append
(
m_count
).
append
(
"\t"
).
append
(
"Sum: "
).
append
(
m_sum
).
append
(
"\t"
)
//
.
append
(
"Sum2: "
).
append
(
m_sum2
).
append
(
"\t"
).
append
(
"Std: "
).
append
(
getStd
());
sb
.
append
(
"\t"
).
append
(
"Success: "
).
append
(
m_successCount
);
sb
.
append
(
"\t"
).
append
(
"Failure: "
).
append
(
m_failureCount
).
append
(
"\t"
).
append
(
"Long: "
).
append
(
m_longTimeCount
);
return
sb
.
toString
();
}
private
double
m_max
=
Double
.
MIN_VALUE
;
private
double
m_min
=
Double
.
MAX_VALUE
;
private
double
m_sum
;
private
double
m_sum2
;
private
List
<
String
>
m_urls
=
new
ArrayList
<
String
>();
private
DecimalFormat
df
=
new
DecimalFormat
(
"#.##"
);
public
void
add
(
double
value
,
int
flag
)
{
m_count
++;
public
void
add
(
double
value
,
int
flag
,
String
url
)
{
m_sum
+=
value
;
m_sum2
=
m_sum2
+
value
*
value
;
m_sum2
+=
value
*
value
;
if
(
flag
==
1
)
{
m_failureCount
++;
}
else
{
m_successCount
++;
}
if
(
value
>
LONG_TIME
)
{
if
(
value
>
LONG_TIME
)
{
m_longTimeCount
++;
}
if
(
value
<
m_min
)
{
m_min
=
value
;
}
if
(
value
>
m_max
)
{
m_max
=
value
;
}
m_durations
.
add
(
value
);
int
size
=
m_urls
.
size
();
if
(
size
==
0
)
{
m_urls
.
add
(
url
);
}
else
if
(
size
==
1
&&
flag
==
1
)
{
m_urls
.
add
(
url
);
}
}
public
double
getAvg
()
{
if
(
m_count
==
0
)
{
return
0
;
Collections
.
sort
(
m_durations
);
int
size
=
95
*
m_durations
.
size
()
/
100
;
double
sum
=
0
;
for
(
int
i
=
0
;
i
<
size
;
i
++)
{
sum
=
sum
+
m_durations
.
get
(
i
);
}
return
m_sum
/
m_count
;
}
private
double
getStd
()
{
double
ave
=
getAvg
();
return
Math
.
sqrt
(
m_sum2
/
m_count
-
2
*
ave
*
ave
+
ave
*
ave
);
return
sum
/
(
double
)
size
;
}
@Override
...
...
@@ -63,10 +77,23 @@ public class SqlJobResult implements Writable {
"This method should never be called, please check with the author if any problem."
);
}
public
String
toString
()
{
StringBuffer
sb
=
new
StringBuffer
();
sb
.
append
(
m_durations
.
size
()).
append
(
SPIT
).
append
(
m_failureCount
).
append
(
SPIT
).
append
(
m_longTimeCount
)
.
append
(
SPIT
);
sb
.
append
(
df
.
format
(
m_min
)).
append
(
SPIT
).
append
(
df
.
format
(
m_max
)).
append
(
SPIT
).
append
(
df
.
format
(
m_sum
))
.
append
(
SPIT
).
append
(
df
.
format
(
m_sum2
)).
append
(
SPIT
).
append
(
df
.
format
(
getAvg
())).
append
(
SPIT
);
int
size
=
m_urls
.
size
();
sb
.
append
(
m_urls
.
get
(
size
-
1
));
return
sb
.
toString
();
}
@Override
public
void
write
(
DataOutput
arg0
)
throws
IOException
{
throw
new
UnsupportedOperationException
(
"This method should never be called, please check with the author if any problem."
);
}
}
\ No newline at end of file
cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java
浏览文件 @
d7c3d513
...
...
@@ -8,15 +8,27 @@ import org.apache.hadoop.io.Text;
import
org.apache.hadoop.io.WritableComparable
;
public
class
SqlStatementKey
implements
WritableComparable
<
SqlStatementKey
>
{
private
Text
m_
statement
;
private
Text
m_
name
;
private
Text
m_domain
;
private
Text
m_statement
;
public
SqlStatementKey
()
{
m_name
=
new
Text
();
m_statement
=
new
Text
();
m_domain
=
new
Text
();
}
public
Text
getName
()
{
return
m_name
;
}
public
SqlStatementKey
setName
(
Text
name
)
{
m_name
=
name
;
return
this
;
}
public
Text
getDomain
()
{
return
m_domain
;
}
...
...
@@ -37,12 +49,14 @@ public class SqlStatementKey implements WritableComparable<SqlStatementKey> {
@Override
public
void
write
(
DataOutput
out
)
throws
IOException
{
m_name
.
write
(
out
);
m_domain
.
write
(
out
);
m_statement
.
write
(
out
);
}
@Override
public
void
readFields
(
DataInput
in
)
throws
IOException
{
m_name
.
readFields
(
in
);
m_domain
.
readFields
(
in
);
m_statement
.
readFields
(
in
);
}
...
...
@@ -50,10 +64,10 @@ public class SqlStatementKey implements WritableComparable<SqlStatementKey> {
@Override
public
int
compareTo
(
SqlStatementKey
key
)
{
if
(
m_domain
.
toString
().
equals
(
key
.
getDomain
().
toString
()))
{
if
(
m_
statement
.
toString
().
equals
(
key
.
getStatement
().
toString
()))
{
if
(
m_
name
.
toString
().
equals
(
key
.
getName
().
toString
()))
{
return
0
;
}
else
{
return
m_
statement
.
compareTo
(
key
.
getStatement
());
return
m_
name
.
compareTo
(
key
.
getName
());
}
}
return
m_domain
.
compareTo
(
key
.
getDomain
());
...
...
@@ -61,6 +75,18 @@ public class SqlStatementKey implements WritableComparable<SqlStatementKey> {
@Override
public
String
toString
()
{
return
String
.
format
(
"[domain:%s statement:%s]"
,
m_domain
,
m_statement
);
String
statement
=
m_statement
.
toString
();
// to assure the output string not contain \t
statement
=
statement
.
replaceAll
(
"\n"
,
" "
);
statement
=
statement
.
replaceAll
(
"\t"
,
" "
);
statement
=
statement
.
replaceAll
(
"\""
,
"\'"
);
m_statement
=
new
Text
(
statement
);
String
name
=
m_name
.
toString
();
name
=
name
.
replaceAll
(
"\n"
,
" "
);
name
=
name
.
replaceAll
(
"\t"
,
" "
);
name
=
name
.
replaceAll
(
"\""
,
"\'"
);
m_name
=
new
Text
(
name
);
return
String
.
format
(
"%s\t%s\t%s"
,
m_domain
,
m_name
,
m_statement
);
}
}
cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java
浏览文件 @
d7c3d513
...
...
@@ -4,6 +4,7 @@ import java.io.DataInput;
import
java.io.DataOutput
;
import
java.io.IOException
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.io.Writable
;
public
class
SqlStatementValue
implements
Writable
{
...
...
@@ -12,8 +13,10 @@ public class SqlStatementValue implements Writable {
public
double
m_value
;
public
Text
m_sampleUrl
;
public
SqlStatementValue
(){
m_sampleUrl
=
new
Text
();
}
public
int
getFlag
()
{
...
...
@@ -23,22 +26,28 @@ public class SqlStatementValue implements Writable {
public
double
getValue
()
{
return
m_value
;
}
public
String
getSampleUrl
(){
return
m_sampleUrl
.
toString
();
}
public
SqlStatementValue
(
int
flag
,
double
value
)
{
public
SqlStatementValue
(
int
flag
,
double
value
,
String
url
)
{
m_flag
=
flag
;
m_value
=
value
;
m_sampleUrl
=
new
Text
(
url
);
}
@Override
public
void
readFields
(
DataInput
input
)
throws
IOException
{
m_flag
=
input
.
readInt
();
m_value
=
input
.
readDouble
();
m_sampleUrl
.
readFields
(
input
);
}
@Override
public
void
write
(
DataOutput
output
)
throws
IOException
{
output
.
writeInt
(
m_flag
);
output
.
writeDouble
(
m_value
);
m_sampleUrl
.
write
(
output
);
}
}
cat-job/src/main/java/com/dianping/cat/job/sql/database/ContainerBootstrap.java
0 → 100644
浏览文件 @
d7c3d513
package
com.dianping.cat.job.sql.database
;
import
org.codehaus.plexus.PlexusContainer
;
import
org.codehaus.plexus.component.repository.exception.ComponentLookupException
;
import
com.site.lookup.ContainerLoader
;
public
enum
ContainerBootstrap
{
INSTANCE
;
private
PlexusContainer
m_container
;
private
ContainerBootstrap
()
{
m_container
=
ContainerLoader
.
getDefaultContainer
();
}
@SuppressWarnings
(
"unchecked"
)
public
<
T
>
T
lookup
(
Class
<
T
>
role
)
throws
ComponentLookupException
{
return
(
T
)
m_container
.
lookup
(
role
);
}
@SuppressWarnings
(
"unchecked"
)
public
<
T
>
T
lookup
(
Class
<
T
>
role
,
String
roleHint
)
throws
ComponentLookupException
{
return
(
T
)
m_container
.
lookup
(
role
,
roleHint
);
}
}
cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobMapper.java
0 → 100644
浏览文件 @
d7c3d513
package
com.dianping.cat.job.sql.database
;
import
java.io.IOException
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapreduce.Mapper
;
public
class
SqlRecordJobMapper
extends
Mapper
<
Object
,
Text
,
Text
,
Text
>
{
public
void
map
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
context
.
write
(
value
,
new
Text
(
context
.
getConfiguration
().
get
(
"JobHour"
,
""
)));
}
}
cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobReducer.java
0 → 100644
浏览文件 @
d7c3d513
package
com.dianping.cat.job.sql.database
;
import
java.io.IOException
;
import
java.util.Date
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapreduce.Reducer
;
import
org.codehaus.plexus.component.repository.exception.ComponentLookupException
;
import
com.dianping.cat.job.sql.dal.Sqlreport
;
import
com.dianping.cat.job.sql.dal.SqlreportDao
;
import
com.site.dal.jdbc.DalException
;
public
class
SqlRecordJobReducer
extends
Reducer
<
Text
,
Text
,
Text
,
Text
>{
public
void
reduce
(
Text
key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
Text
currentHour
=
values
.
iterator
().
next
();
SqlReportRecord
sql
=
new
SqlReportRecord
(
currentHour
.
toString
(),
key
.
toString
());
try
{
SqlreportDao
dao
=
ContainerBootstrap
.
INSTANCE
.
lookup
(
SqlreportDao
.
class
);
Sqlreport
row
=
dao
.
createLocal
();
row
.
setDomain
(
sql
.
getDomain
());
row
.
setTotalcount
(
sql
.
getTotalCount
());
row
.
setFailures
(
sql
.
getFailureCount
());
row
.
setLongsqls
(
sql
.
getLongCount
());
row
.
setAvg2value
(
sql
.
getAvg2
());
row
.
setSumvalue
(
sql
.
getSum
());
row
.
setSum2value
(
sql
.
getSum2
());
row
.
setMaxvalue
(
sql
.
getMax
());
row
.
setMinvalue
(
sql
.
getMin
());
row
.
setStatement
(
sql
.
getStatement
());
row
.
setName
(
sql
.
getName
());
row
.
setSamplelink
(
sql
.
getSampleLink
());
row
.
setTransactiondate
(
sql
.
getDate
());
row
.
setCreationdate
(
new
Date
());
dao
.
insert
(
row
);
}
catch
(
ComponentLookupException
e
)
{
// TODO Auto-generated catch block
e
.
printStackTrace
();
}
catch
(
DalException
e
)
{
// TODO Auto-generated catch block
e
.
printStackTrace
();
}
System
.
out
.
println
(
sql
);
}
}
cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlReportRecord.java
0 → 100644
浏览文件 @
d7c3d513
package
com.dianping.cat.job.sql.database
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
public
class
SqlReportRecord
{
private
String
m_domain
;
private
String
m_name
;
private
String
m_statement
;
private
String
m_sampleLink
;
private
Date
m_transactionDate
;
private
int
m_totalCount
;
private
int
m_failureCount
;
private
int
m_longCount
;
private
double
m_min
;
private
double
m_max
;
/**
* the avg2 is not contain the max of top5%
*/
private
double
m_avg2
;
private
double
m_sum
;
private
double
m_sum2
;
private
Date
m_creationDate
;
private
SimpleDateFormat
sdf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
private
SimpleDateFormat
hourFormat
=
new
SimpleDateFormat
(
"yyyyMMdd/HH"
);
private
static
final
String
SPIT
=
"\t"
;
public
SqlReportRecord
(){
}
public
String
toString
()
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
sdf
.
format
(
m_transactionDate
)).
append
(
SPIT
);
sb
.
append
(
sdf
.
format
(
m_creationDate
)).
append
(
SPIT
);
sb
.
append
(
m_domain
).
append
(
SPIT
);
sb
.
append
(
m_name
).
append
(
SPIT
);
sb
.
append
(
m_statement
).
append
(
SPIT
);
sb
.
append
(
m_totalCount
).
append
(
SPIT
);
sb
.
append
(
m_failureCount
).
append
(
SPIT
);
sb
.
append
(
m_longCount
).
append
(
SPIT
);
sb
.
append
(
m_min
).
append
(
SPIT
);
sb
.
append
(
m_max
).
append
(
SPIT
);
sb
.
append
(
m_sum
).
append
(
SPIT
);
sb
.
append
(
m_sum2
).
append
(
SPIT
);
sb
.
append
(
m_avg2
).
append
(
SPIT
);
sb
.
append
(
m_sampleLink
).
append
(
SPIT
);
return
sb
.
toString
();
}
// domain1 SQLStatement Internal9 500 500 500 100 199 74750 11591750 147
public
SqlReportRecord
(
String
currentHour
,
String
text
)
{
try
{
m_transactionDate
=
hourFormat
.
parse
(
currentHour
);
}
catch
(
ParseException
e
)
{
Date
error
=
new
Date
();
error
.
setTime
(
0
);
m_transactionDate
=
error
;
}
m_creationDate
=
new
Date
();
String
[]
params
=
text
.
split
(
"\t"
);
m_domain
=
params
[
0
];
m_name
=
params
[
1
];
m_statement
=
params
[
2
];
m_totalCount
=
Integer
.
parseInt
(
params
[
3
]);
m_failureCount
=
Integer
.
parseInt
(
params
[
4
]);
m_longCount
=
Integer
.
parseInt
(
params
[
5
]);
m_min
=
Double
.
parseDouble
(
params
[
6
]);
m_max
=
Double
.
parseDouble
(
params
[
7
]);
m_sum
=
Double
.
parseDouble
(
params
[
8
]);
m_sum2
=
Double
.
parseDouble
(
params
[
9
]);
m_avg2
=
Double
.
parseDouble
(
params
[
10
]);
m_sampleLink
=
params
[
11
];
}
public
String
getName
()
{
return
m_name
;
}
public
void
setName
(
String
name
)
{
m_name
=
name
;
}
public
String
getDomain
()
{
return
m_domain
;
}
public
void
setDomain
(
String
domain
)
{
m_domain
=
domain
;
}
public
String
getStatement
()
{
return
m_statement
;
}
public
void
setStatement
(
String
statement
)
{
m_statement
=
statement
;
}
public
String
getSampleLink
()
{
return
m_sampleLink
;
}
public
void
setSampleLink
(
String
sampleLink
)
{
m_sampleLink
=
sampleLink
;
}
public
Date
getDate
()
{
return
m_transactionDate
;
}
public
void
setDate
(
Date
date
)
{
m_transactionDate
=
date
;
}
public
int
getTotalCount
()
{
return
m_totalCount
;
}
public
void
setTotalCount
(
int
totalCount
)
{
m_totalCount
=
totalCount
;
}
public
int
getFailureCount
()
{
return
m_failureCount
;
}
public
void
setFailureCount
(
int
failureCount
)
{
m_failureCount
=
failureCount
;
}
public
int
getLongCount
()
{
return
m_longCount
;
}
public
void
setLongCount
(
int
longCount
)
{
m_longCount
=
longCount
;
}
public
double
getMin
()
{
return
m_min
;
}
public
void
setMin
(
double
min
)
{
m_min
=
min
;
}
public
double
getMax
()
{
return
m_max
;
}
public
void
setMax
(
double
max
)
{
m_max
=
max
;
}
public
double
getAvg2
()
{
return
m_avg2
;
}
public
void
setAvg2
(
double
avg2
)
{
m_avg2
=
avg2
;
}
public
double
getSum
()
{
return
m_sum
;
}
public
void
setSum
(
double
sum
)
{
m_sum
=
sum
;
}
public
double
getSum2
()
{
return
m_sum2
;
}
public
void
setSum2
(
double
sum2
)
{
m_sum2
=
sum2
;
}
public
Date
getCreatTime
()
{
return
m_creationDate
;
}
public
void
setCreatTime
(
Date
creatTime
)
{
m_creationDate
=
creatTime
;
}
}
cat-job/src/main/resources/META-INF/dal/jdbc/codegen.xml
0 → 100644
浏览文件 @
d7c3d513
<?xml version="1.0" encoding="UTF-8"?>
<entities>
<entity
name=
"sqlreport"
table=
"SqlReport"
alias=
"s"
>
<member
name=
"id"
field=
"Id"
value-type=
"int"
length=
"10"
nullable=
"false"
key=
"true"
auto-increment=
"true"
/>
<member
name=
"domain"
field=
"Domain"
value-type=
"String"
length=
"50"
nullable=
"false"
/>
<member
name=
"statement"
field=
"Statement"
value-type=
"String"
length=
"500"
nullable=
"false"
/>
<member
name=
"totalcount"
field=
"TotalCount"
value-type=
"int"
length=
"10"
nullable=
"false"
/>
<member
name=
"failures"
field=
"Failures"
value-type=
"int"
length=
"10"
nullable=
"false"
/>
<member
name=
"longsqls"
field=
"LongSqls"
value-type=
"int"
length=
"10"
nullable=
"false"
/>
<member
name=
"minvalue"
field=
"MinValue"
value-type=
"long"
length=
"22"
nullable=
"false"
/>
<member
name=
"maxvalue"
field=
"MaxValue"
value-type=
"long"
length=
"22"
nullable=
"false"
/>
<member
name=
"avg2value"
field=
"Avg2Value"
value-type=
"long"
length=
"22"
nullable=
"false"
/>
<member
name=
"sumvalue"
field=
"SumValue"
value-type=
"long"
length=
"22"
nullable=
"false"
/>
<member
name=
"sum2value"
field=
"Sum2Value"
value-type=
"long"
length=
"22"
nullable=
"false"
/>
<member
name=
"samplelink"
field=
"SampleLink"
value-type=
"String"
length=
"64"
nullable=
"false"
/>
<member
name=
"transactiondate"
field=
"TransactionDate"
value-type=
"Date"
nullable=
"false"
/>
<member
name=
"creationdate"
field=
"CreationDate"
value-type=
"Date"
nullable=
"false"
/>
<member
name=
"name"
field=
"Name"
value-type=
"String"
length=
"100"
nullable=
"false"
/>
<var
name=
"key-id"
value-type=
"int"
key-member=
"id"
/>
<primary-key
name=
"PRIMARY"
members=
"Id"
/>
<readsets>
<readset
name=
"FULL"
all=
"true"
/>
</readsets>
<updatesets>
<updateset
name=
"FULL"
all=
"true"
/>
</updatesets>
<query-defs>
<query
name=
"find-by-PK"
type=
"SELECT"
>
<param
name=
"key-id"
/>
<statement>
<![CDATA[SELECT <FIELDS/> FROM <TABLE/> WHERE <FIELD name='id'/> = ${key-id}]]>
</statement>
</query>
<query
name=
"insert"
type=
"INSERT"
>
<statement>
<![CDATA[INSERT INTO <TABLE/>(<FIELDS/>) VALUES(<VALUES/>)]]>
</statement>
</query>
<query
name=
"update-by-PK"
type=
"UPDATE"
>
<param
name=
"key-id"
/>
<statement>
<![CDATA[UPDATE <TABLE/> SET <FIELDS/> WHERE <FIELD name='id'/> = ${key-id}]]>
</statement>
</query>
<query
name=
"delete-by-PK"
type=
"DELETE"
>
<param
name=
"key-id"
/>
<statement>
<![CDATA[DELETE FROM <TABLE/> WHERE <FIELD name='id'/> = ${key-id}]]>
</statement>
</query>
</query-defs>
</entity>
</entities>
cat-job/src/main/resources/META-INF/dal/jdbc/dal.xml
0 → 100644
浏览文件 @
d7c3d513
<?xml version="1.0" encoding="UTF-8"?>
<entities
do-package=
"com.dianping.cat.job.sql.dal"
gen=
"true"
>
<entity
name=
"sqlreport"
table=
"SqlReport"
alias=
"s"
>
<member
name=
"minvalue"
field=
"MinValue"
value-type=
"double"
length=
"22"
nullable=
"false"
/>
<member
name=
"maxvalue"
field=
"MaxValue"
value-type=
"double"
length=
"22"
nullable=
"false"
/>
<member
name=
"avg2value"
field=
"Avg2Value"
value-type=
"double"
length=
"22"
nullable=
"false"
/>
<member
name=
"sumvalue"
field=
"SumValue"
value-type=
"double"
length=
"22"
nullable=
"false"
/>
<member
name=
"sum2value"
field=
"Sum2Value"
value-type=
"double"
length=
"22"
nullable=
"false"
/>
</entity>
</entities>
cat-job/src/main/resources/META-INF/dal/jdbc/manifest.xml
0 → 100644
浏览文件 @
d7c3d513
<?xml version="1.0" encoding="UTF-8"?>
<manifest>
<file
path=
"codegen.xml"
/>
<file
path=
"dal.xml"
/>
</manifest>
cat-job/src/main/resources/META-INF/plexus/components.xml
浏览文件 @
d7c3d513
...
...
@@ -44,5 +44,55 @@
</requirement>
</requirements>
</component>
<component>
<role>
com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager
</role>
<implementation>
com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager
</implementation>
<configuration>
<datasourceFile>
datasources.xml
</datasourceFile>
</configuration>
</component>
<component>
<role>
com.site.dal.jdbc.datasource.DataSource
</role>
<role-hint>
cat
</role-hint>
<implementation>
com.site.dal.jdbc.datasource.JdbcDataSource
</implementation>
<configuration>
<id>
cat
</id>
<maximum-pool-size>
3
</maximum-pool-size>
<connection-timeout>
1s
</connection-timeout>
<idle-timeout>
10m
</idle-timeout>
<statement-cache-size>
1000
</statement-cache-size>
<properties>
<driver>
${jdbc.driver}
</driver>
<URL>
${jdbc.url}
</URL>
<user>
${jdbc.user}
</user>
<password>
${jdbc.password}
</password>
<connectionProperties>
<![CDATA[${jdbc.connectionProperties}]]>
</connectionProperties>
</properties>
</configuration>
<requirements>
<requirement>
<role>
com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager
</role>
</requirement>
</requirements>
</component>
<component>
<role>
com.site.dal.jdbc.mapping.TableProvider
</role>
<role-hint>
sqlreport
</role-hint>
<implementation>
com.site.dal.jdbc.mapping.SimpleTableProvider
</implementation>
<configuration>
<logical-table-name>
sqlreport
</logical-table-name>
<physical-table-name>
sqlreport
</physical-table-name>
<data-source-name>
cat
</data-source-name>
</configuration>
</component>
<component>
<role>
com.dianping.cat.job.sql.dal.SqlreportDao
</role>
<implementation>
com.dianping.cat.job.sql.dal.SqlreportDao
</implementation>
<requirements>
<requirement>
<role>
com.site.dal.jdbc.QueryEngine
</role>
</requirement>
</requirements>
</component>
</components>
</plexus>
cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java
浏览文件 @
d7c3d513
...
...
@@ -22,56 +22,71 @@ public class SqlJobDataProduceTest extends CatTestCase {
MessageStorage
storage
=
lookup
(
MessageStorage
.
class
,
"hdfs"
);
MessageProducer
producer
=
lookup
(
MessageProducer
.
class
);
InMemoryQueue
queue
=
lookup
(
InMemoryQueue
.
class
);
for
(
int
i
=
0
;
i
<
10000
;
i
++)
{
Transaction
t
=
producer
.
newTransaction
(
"URL"
,
"MyPage"
+
(
int
)
(
i
/
500
));
try
{
// do your business here
t
.
addData
(
"k1"
,
"v1"
);
t
.
addData
(
"k2"
,
"v2"
);
t
.
addData
(
"k3"
,
"v3"
);
Thread
.
sleep
(
1
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
String
sqlStatement
=
"SQLStatement"
+
i
/
500
;
String
sqlParaMeter
=
"SqlParaMeters"
;
Transaction
sqlTran
=
producer
.
newTransaction
(
"SQL"
,
sqlStatement
);
producer
.
logEvent
(
"SQL.PARAM"
,
sqlStatement
,
Transaction
.
SUCCESS
,
Stringizers
.
forJson
().
compact
().
from
(
sqlParaMeter
));
sqlTran
.
addData
(
sqlStatement
+
"detail..."
);
sqlTran
.
complete
();
if
(
i
%
2
==
1
)
{
sqlTran
.
setStatus
(
Message
.
SUCCESS
);
}
else
{
sqlTran
.
setStatus
(
"Error"
);
for
(
int
i
=
0
;
i
<
3
;
i
++)
{
for
(
int
j
=
0
;
j
<
10000
;
j
++)
{
Transaction
t
=
producer
.
newTransaction
(
"URL"
,
"MyPage"
+
(
int
)
(
j
/
500
));
try
{
// do your business here
t
.
addData
(
"k1"
,
"v1"
);
t
.
addData
(
"k2"
,
"v2"
);
t
.
addData
(
"k3"
,
"v3"
);
Thread
.
sleep
(
1
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
String
sqlName
=
"SQLStatement"
+
j
/
500
;
String
sqlParaMeter
=
"SQLParaMeter"
+
j
/
500
;
String
sqlStatement
=
"select * from table where id=\"1\"\n order by id desc"
;
Transaction
sqlTran
=
producer
.
newTransaction
(
"SQL"
,
sqlName
);
producer
.
logEvent
(
"SQL.PARAM"
,
sqlParaMeter
,
Transaction
.
SUCCESS
,
Stringizers
.
forJson
().
compact
().
from
(
sqlParaMeter
));
sqlTran
.
addData
(
sqlStatement
);
String
sqlInternalName
=
"SQLStatement Internal"
+
j
/
500
;
String
sqlParaInternal
=
"SQLParaMeter Internal"
+
j
/
500
;
String
sqlInternal
=
"select * from intenal table where id=\"1\"\n order by id desc"
;
Transaction
internal
=
producer
.
newTransaction
(
"SQL"
,
sqlInternalName
);
producer
.
logEvent
(
"SQL.PARAM"
,
sqlParaInternal
,
Transaction
.
SUCCESS
,
Stringizers
.
forJson
().
compact
()
.
from
(
sqlParaInternal
));
internal
.
addData
(
sqlInternal
);
internal
.
complete
();
if
(
j
%
2
==
1
)
{
internal
.
setStatus
(
Message
.
SUCCESS
);
}
else
{
internal
.
setStatus
(
"Error"
);
}
sqlTran
.
complete
();
DefaultTransaction
sqlInternalTran
=
(
DefaultTransaction
)
internal
;
sqlInternalTran
.
setDuration
(
j
%
100
+
100
);
if
(
j
%
2
==
1
)
{
sqlTran
.
setStatus
(
Message
.
SUCCESS
);
}
else
{
sqlTran
.
setStatus
(
"Error"
);
}
DefaultTransaction
def
=
(
DefaultTransaction
)
sqlTran
;
def
.
setDuration
(
j
%
100
+
50
);
t
.
setStatus
(
Message
.
SUCCESS
);
}
catch
(
Exception
e
)
{
t
.
setStatus
(
e
);
}
finally
{
t
.
complete
();
}
DefaultTransaction
def
=
(
DefaultTransaction
)
sqlTran
;
def
.
setDuration
(
i
%
100
);
t
.
setStatus
(
Message
.
SUCCESS
);
}
catch
(
Exception
e
)
{
t
.
setStatus
(
e
);
}
finally
{
t
.
complete
();
MessageTree
tree
=
queue
.
poll
(
0
);
tree
.
setDomain
(
"domain"
+
i
);
storage
.
store
(
tree
);
}
MessageTree
tree
=
queue
.
poll
(
0
);
//tree.setDomain("Domain" + i % 3);
storage
.
store
(
tree
);
}
((
HdfsMessageStorage
)
storage
).
dispose
();
}
}
cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java
0 → 100644
浏览文件 @
d7c3d513
package
com.dianping.cat.job.sql
;
import
java.text.DecimalFormat
;
import
junit.framework.Assert
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.junit.runners.JUnit4
;
import
com.dianping.cat.job.sql.database.SqlReportRecord
;
@RunWith
(
JUnit4
.
class
)
public
class
SqlRecordTest
{
@Test
public
void
test
()
{
DecimalFormat
df
=
new
DecimalFormat
(
"#.##"
);
Assert
.
assertEquals
(
"1.23"
,
df
.
format
(
1.234567
));
}
@Test
public
void
test2
()
{
String
text
=
"domain1 SQLStatement-Internal9 insert into mysql where is='sfsdf' 500 500 500 100 199 74750 11591750 147 www.sina.com"
;
SqlReportRecord
record
=
new
SqlReportRecord
(
"20120309/11"
,
text
);
Assert
.
assertEquals
(
"domain1"
,
record
.
getDomain
());
Assert
.
assertEquals
(
"SQLStatement-Internal9"
,
record
.
getName
());
Assert
.
assertEquals
(
"insert into mysql where is='sfsdf'"
,
record
.
getStatement
());
Assert
.
assertEquals
(
500
,
record
.
getTotalCount
());
Assert
.
assertEquals
(
500
,
record
.
getFailureCount
());
Assert
.
assertEquals
(
500
,
record
.
getLongCount
());
Assert
.
assertEquals
(
100.0
,
record
.
getMin
());
Assert
.
assertEquals
(
199.0
,
record
.
getMax
());
Assert
.
assertEquals
(
74750.0
,
record
.
getSum
());
Assert
.
assertEquals
(
11591750.0
,
record
.
getSum2
());
Assert
.
assertEquals
(
147.0
,
record
.
getAvg2
());
Assert
.
assertEquals
(
"www.sina.com"
,
record
.
getSampleLink
());
}
}
pom.xml
浏览文件 @
d7c3d513
...
...
@@ -63,7 +63,7 @@
<dependency>
<groupId>
com.site.dal
</groupId>
<artifactId>
dal-jdbc
</artifactId>
<version>
1.0.0
</version>
<version>
1.0.0
-a1
</version>
</dependency>
<dependency>
<groupId>
com.site.app
</groupId>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录