Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦中观雨
cat
提交
0762a4e6
C
cat
项目概览
梦中观雨
/
cat
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
cat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0762a4e6
编写于
4月 21, 2014
作者:
Y
youyong205
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix bug
上级
d576898b
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
647 addition
and
13 deletion
+647
-13
cat-client/src/main/java/com/dianping/cat/message/spi/codec/OldPlainTextMessageCodec.java
...nping/cat/message/spi/codec/OldPlainTextMessageCodec.java
+571
-0
cat-client/src/main/java/com/dianping/cat/message/spi/codec/PlainTextMessageCodec.java
...dianping/cat/message/spi/codec/PlainTextMessageCodec.java
+5
-3
cat-core/src/test/java/com/dianping/cat/message/spi/core/PlainTextCodecTest.java
...com/dianping/cat/message/spi/core/PlainTextCodecTest.java
+63
-2
script/Cat.sql
script/Cat.sql
+8
-8
未找到文件。
cat-client/src/main/java/com/dianping/cat/message/spi/codec/OldPlainTextMessageCodec.java
0 → 100644
浏览文件 @
0762a4e6
package
com.dianping.cat.message.spi.codec
;
import
java.io.UnsupportedEncodingException
;
import
java.nio.charset.Charset
;
import
java.text.ParseException
;
import
java.text.SimpleDateFormat
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Stack
;
import
java.util.TimeZone
;
import
java.util.concurrent.ArrayBlockingQueue
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.codehaus.plexus.logging.LogEnabled
;
import
org.codehaus.plexus.logging.Logger
;
import
org.jboss.netty.buffer.ChannelBuffer
;
import
org.unidal.lookup.annotation.Inject
;
import
com.dianping.cat.message.Event
;
import
com.dianping.cat.message.Heartbeat
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.Metric
;
import
com.dianping.cat.message.Trace
;
import
com.dianping.cat.message.Transaction
;
import
com.dianping.cat.message.internal.DefaultEvent
;
import
com.dianping.cat.message.internal.DefaultHeartbeat
;
import
com.dianping.cat.message.internal.DefaultMetric
;
import
com.dianping.cat.message.internal.DefaultTrace
;
import
com.dianping.cat.message.internal.DefaultTransaction
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.internal.DefaultMessageTree
;
public
class
OldPlainTextMessageCodec
implements
MessageCodec
,
LogEnabled
{
public
static
final
String
ID
=
"plain-text"
;
private
static
final
String
VERSION
=
"PT1"
;
// plain text version 1
private
static
final
byte
TAB
=
'\t'
;
// tab character
private
static
final
byte
LF
=
'\n'
;
// line feed character
@Inject
private
BufferWriter
m_writer
=
new
EscapingBufferWriter
();
private
BufferHelper
m_bufferHelper
=
new
BufferHelper
(
m_writer
);
private
DateHelper
m_dateHelper
=
new
DateHelper
();
private
Logger
m_logger
;
@Override
public
MessageTree
decode
(
ChannelBuffer
buf
)
{
MessageTree
tree
=
new
DefaultMessageTree
();
decode
(
buf
,
tree
);
return
tree
;
}
@Override
public
void
decode
(
ChannelBuffer
buf
,
MessageTree
tree
)
{
decodeHeader
(
buf
,
tree
);
if
(
buf
.
readableBytes
()
>
0
)
{
decodeMessage
(
buf
,
tree
);
}
}
protected
void
decodeHeader
(
ChannelBuffer
buf
,
MessageTree
tree
)
{
BufferHelper
helper
=
m_bufferHelper
;
String
id
=
helper
.
read
(
buf
,
TAB
);
String
domain
=
helper
.
read
(
buf
,
TAB
);
String
hostName
=
helper
.
read
(
buf
,
TAB
);
String
ipAddress
=
helper
.
read
(
buf
,
TAB
);
String
threadGroupName
=
helper
.
read
(
buf
,
TAB
);
String
threadId
=
helper
.
read
(
buf
,
TAB
);
String
threadName
=
helper
.
read
(
buf
,
TAB
);
String
messageId
=
helper
.
read
(
buf
,
TAB
);
String
parentMessageId
=
helper
.
read
(
buf
,
TAB
);
String
rootMessageId
=
helper
.
read
(
buf
,
TAB
);
String
sessionToken
=
helper
.
read
(
buf
,
LF
);
if
(
VERSION
.
equals
(
id
))
{
tree
.
setDomain
(
domain
);
tree
.
setHostName
(
hostName
);
tree
.
setIpAddress
(
ipAddress
);
tree
.
setThreadGroupName
(
threadGroupName
);
tree
.
setThreadId
(
threadId
);
tree
.
setThreadName
(
threadName
);
tree
.
setMessageId
(
messageId
);
tree
.
setParentMessageId
(
parentMessageId
);
tree
.
setRootMessageId
(
rootMessageId
);
tree
.
setSessionToken
(
sessionToken
);
}
else
{
throw
new
RuntimeException
(
String
.
format
(
"Unrecognized id(%s) for plain text message codec!"
,
id
));
}
}
protected
Message
decodeLine
(
ChannelBuffer
buf
,
DefaultTransaction
parent
,
Stack
<
DefaultTransaction
>
stack
,
MessageTree
tree
)
{
BufferHelper
helper
=
m_bufferHelper
;
byte
identifier
=
buf
.
readByte
();
String
timestamp
=
helper
.
read
(
buf
,
TAB
);
String
type
=
helper
.
readRaw
(
buf
,
TAB
);
String
name
=
helper
.
readRaw
(
buf
,
TAB
);
switch
(
identifier
)
{
case
't'
:
DefaultTransaction
transaction
=
new
DefaultTransaction
(
type
,
name
,
null
);
helper
.
read
(
buf
,
LF
);
// get rid of line feed
transaction
.
setTimestamp
(
m_dateHelper
.
parse
(
timestamp
));
if
(
parent
!=
null
)
{
parent
.
addChild
(
transaction
);
}
stack
.
push
(
parent
);
return
transaction
;
case
'A'
:
DefaultTransaction
tran
=
new
DefaultTransaction
(
type
,
name
,
null
);
String
status
=
helper
.
readRaw
(
buf
,
TAB
);
String
duration
=
helper
.
read
(
buf
,
TAB
);
String
data
=
helper
.
readRaw
(
buf
,
TAB
);
helper
.
read
(
buf
,
LF
);
// get rid of line feed
tran
.
setTimestamp
(
m_dateHelper
.
parse
(
timestamp
));
tran
.
setStatus
(
status
);
tran
.
addData
(
data
);
long
d
=
Long
.
parseLong
(
duration
.
substring
(
0
,
duration
.
length
()
-
2
));
tran
.
setDurationInMicros
(
d
);
if
(
parent
!=
null
)
{
parent
.
addChild
(
tran
);
return
parent
;
}
else
{
return
tran
;
}
case
'T'
:
String
transactionStatus
=
helper
.
readRaw
(
buf
,
TAB
);
String
transactionDuration
=
helper
.
read
(
buf
,
TAB
);
String
transactionData
=
helper
.
readRaw
(
buf
,
TAB
);
helper
.
read
(
buf
,
LF
);
// get rid of line feed
parent
.
setStatus
(
transactionStatus
);
parent
.
addData
(
transactionData
);
long
transactionD
=
Long
.
parseLong
(
transactionDuration
.
substring
(
0
,
transactionDuration
.
length
()
-
2
));
parent
.
setDurationInMicros
(
transactionD
);
return
stack
.
pop
();
case
'E'
:
DefaultEvent
event
=
new
DefaultEvent
(
type
,
name
);
String
eventStatus
=
helper
.
readRaw
(
buf
,
TAB
);
String
eventData
=
helper
.
readRaw
(
buf
,
TAB
);
helper
.
read
(
buf
,
LF
);
// get rid of line feed
event
.
setTimestamp
(
m_dateHelper
.
parse
(
timestamp
));
event
.
setStatus
(
eventStatus
);
event
.
addData
(
eventData
);
if
(
parent
!=
null
)
{
parent
.
addChild
(
event
);
return
parent
;
}
else
{
return
event
;
}
case
'M'
:
DefaultMetric
metric
=
new
DefaultMetric
(
type
,
name
);
String
metricStatus
=
helper
.
readRaw
(
buf
,
TAB
);
String
metricData
=
helper
.
readRaw
(
buf
,
TAB
);
helper
.
read
(
buf
,
LF
);
// get rid of line feed
metric
.
setTimestamp
(
m_dateHelper
.
parse
(
timestamp
));
metric
.
setStatus
(
metricStatus
);
metric
.
addData
(
metricData
);
if
(
parent
!=
null
)
{
parent
.
addChild
(
metric
);
return
parent
;
}
else
{
return
metric
;
}
case
'L'
:
DefaultTrace
trace
=
new
DefaultTrace
(
type
,
name
);
String
traceStatus
=
helper
.
readRaw
(
buf
,
TAB
);
String
traceData
=
helper
.
readRaw
(
buf
,
TAB
);
helper
.
read
(
buf
,
LF
);
// get rid of line feed
trace
.
setTimestamp
(
m_dateHelper
.
parse
(
timestamp
));
trace
.
setStatus
(
traceStatus
);
trace
.
addData
(
traceData
);
if
(
parent
!=
null
)
{
parent
.
addChild
(
trace
);
return
parent
;
}
else
{
return
trace
;
}
case
'H'
:
DefaultHeartbeat
heartbeat
=
new
DefaultHeartbeat
(
type
,
name
);
String
heartbeatStatus
=
helper
.
readRaw
(
buf
,
TAB
);
String
heartbeatData
=
helper
.
readRaw
(
buf
,
TAB
);
helper
.
read
(
buf
,
LF
);
// get rid of line feed
heartbeat
.
setTimestamp
(
m_dateHelper
.
parse
(
timestamp
));
heartbeat
.
setStatus
(
heartbeatStatus
);
heartbeat
.
addData
(
heartbeatData
);
if
(
parent
!=
null
)
{
parent
.
addChild
(
heartbeat
);
return
parent
;
}
else
{
return
heartbeat
;
}
default
:
m_logger
.
warn
(
"Unknown identifier("
+
(
char
)
identifier
+
") of message: "
+
buf
.
toString
(
Charset
.
forName
(
"utf-8"
)));
throw
new
RuntimeException
(
"Unknown identifier int name"
);
}
}
protected
void
decodeMessage
(
ChannelBuffer
buf
,
MessageTree
tree
)
{
Stack
<
DefaultTransaction
>
stack
=
new
Stack
<
DefaultTransaction
>();
Message
parent
=
decodeLine
(
buf
,
null
,
stack
,
tree
);
tree
.
setMessage
(
parent
);
while
(
buf
.
readableBytes
()
>
0
)
{
Message
message
=
decodeLine
(
buf
,
(
DefaultTransaction
)
parent
,
stack
,
tree
);
if
(
message
instanceof
DefaultTransaction
)
{
parent
=
message
;
}
else
{
break
;
}
}
}
@Override
public
void
enableLogging
(
Logger
logger
)
{
m_logger
=
logger
;
}
@Override
public
void
encode
(
MessageTree
tree
,
ChannelBuffer
buf
)
{
int
count
=
0
;
int
index
=
buf
.
writerIndex
();
buf
.
writeInt
(
0
);
// place-holder
count
+=
encodeHeader
(
tree
,
buf
);
if
(
tree
.
getMessage
()
!=
null
)
{
count
+=
encodeMessage
(
tree
.
getMessage
(),
buf
);
}
buf
.
setInt
(
index
,
count
);
}
protected
int
encodeHeader
(
MessageTree
tree
,
ChannelBuffer
buf
)
{
BufferHelper
helper
=
m_bufferHelper
;
int
count
=
0
;
count
+=
helper
.
write
(
buf
,
VERSION
);
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getDomain
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getHostName
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getIpAddress
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getThreadGroupName
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getThreadId
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getThreadName
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getMessageId
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getParentMessageId
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getRootMessageId
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
write
(
buf
,
tree
.
getSessionToken
());
count
+=
helper
.
write
(
buf
,
LF
);
return
count
;
}
protected
int
encodeLine
(
Message
message
,
ChannelBuffer
buf
,
char
type
,
Policy
policy
)
{
BufferHelper
helper
=
m_bufferHelper
;
int
count
=
0
;
count
+=
helper
.
write
(
buf
,
(
byte
)
type
);
if
(
type
==
'T'
&&
message
instanceof
Transaction
)
{
long
duration
=
((
Transaction
)
message
).
getDurationInMillis
();
count
+=
helper
.
write
(
buf
,
m_dateHelper
.
format
(
message
.
getTimestamp
()
+
duration
));
}
else
{
count
+=
helper
.
write
(
buf
,
m_dateHelper
.
format
(
message
.
getTimestamp
()));
}
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
writeRaw
(
buf
,
message
.
getType
());
count
+=
helper
.
write
(
buf
,
TAB
);
count
+=
helper
.
writeRaw
(
buf
,
message
.
getName
());
count
+=
helper
.
write
(
buf
,
TAB
);
if
(
policy
!=
Policy
.
WITHOUT_STATUS
)
{
count
+=
helper
.
writeRaw
(
buf
,
message
.
getStatus
());
count
+=
helper
.
write
(
buf
,
TAB
);
Object
data
=
message
.
getData
();
if
(
policy
==
Policy
.
WITH_DURATION
&&
message
instanceof
Transaction
)
{
long
duration
=
((
Transaction
)
message
).
getDurationInMicros
();
count
+=
helper
.
write
(
buf
,
String
.
valueOf
(
duration
));
count
+=
helper
.
write
(
buf
,
"us"
);
count
+=
helper
.
write
(
buf
,
TAB
);
}
count
+=
helper
.
writeRaw
(
buf
,
String
.
valueOf
(
data
));
count
+=
helper
.
write
(
buf
,
TAB
);
}
count
+=
helper
.
write
(
buf
,
LF
);
return
count
;
}
public
int
encodeMessage
(
Message
message
,
ChannelBuffer
buf
)
{
if
(
message
instanceof
Transaction
)
{
Transaction
transaction
=
(
Transaction
)
message
;
List
<
Message
>
children
=
transaction
.
getChildren
();
if
(
children
.
isEmpty
())
{
return
encodeLine
(
transaction
,
buf
,
'A'
,
Policy
.
WITH_DURATION
);
}
else
{
int
count
=
0
;
int
len
=
children
.
size
();
count
+=
encodeLine
(
transaction
,
buf
,
't'
,
Policy
.
WITHOUT_STATUS
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
Message
child
=
children
.
get
(
i
);
count
+=
encodeMessage
(
child
,
buf
);
}
count
+=
encodeLine
(
transaction
,
buf
,
'T'
,
Policy
.
WITH_DURATION
);
return
count
;
}
}
else
if
(
message
instanceof
Event
)
{
return
encodeLine
(
message
,
buf
,
'E'
,
Policy
.
DEFAULT
);
}
else
if
(
message
instanceof
Trace
)
{
return
encodeLine
(
message
,
buf
,
'L'
,
Policy
.
DEFAULT
);
}
else
if
(
message
instanceof
Metric
)
{
return
encodeLine
(
message
,
buf
,
'M'
,
Policy
.
DEFAULT
);
}
else
if
(
message
instanceof
Heartbeat
)
{
return
encodeLine
(
message
,
buf
,
'H'
,
Policy
.
DEFAULT
);
}
else
{
throw
new
RuntimeException
(
String
.
format
(
"Unsupported message type: %s."
,
message
));
}
}
public
void
setBufferWriter
(
BufferWriter
writer
)
{
m_writer
=
writer
;
m_bufferHelper
=
new
BufferHelper
(
m_writer
);
}
protected
static
class
BufferHelper
{
private
BufferWriter
m_writer
;
public
BufferHelper
(
BufferWriter
writer
)
{
m_writer
=
writer
;
}
public
String
read
(
ChannelBuffer
buf
,
byte
separator
)
{
int
count
=
buf
.
bytesBefore
(
separator
);
if
(
count
<
0
)
{
return
null
;
}
else
{
byte
[]
data
=
new
byte
[
count
];
buf
.
readBytes
(
data
);
buf
.
readByte
();
// get rid of separator
return
new
String
(
data
);
}
}
public
String
readRaw
(
ChannelBuffer
buf
,
byte
separator
)
{
try
{
int
count
=
buf
.
bytesBefore
(
separator
);
if
(
count
<
0
)
{
return
null
;
}
else
{
byte
[]
data
=
new
byte
[
count
];
String
str
;
buf
.
readBytes
(
data
);
buf
.
readByte
();
// get rid of separator
int
length
=
data
.
length
;
int
writeIndex
=
0
;
for
(
int
i
=
0
;
i
<
length
;
i
++)
{
if
(
data
[
i
]
==
'\\'
)
{
if
(
i
+
1
<
length
)
{
byte
b
=
data
[
i
+
1
];
if
(
b
==
't'
)
{
data
[
writeIndex
]
=
'\t'
;
i
++;
}
else
if
(
b
==
'r'
)
{
data
[
writeIndex
]
=
'\r'
;
i
++;
}
else
if
(
b
==
'n'
)
{
data
[
writeIndex
]
=
'\n'
;
i
++;
}
else
{
data
[
writeIndex
]
=
'\\'
;
}
}
else
{
data
[
writeIndex
]
=
'\\'
;
}
}
else
{
data
[
writeIndex
]
=
data
[
i
];
}
writeIndex
++;
}
try
{
str
=
new
String
(
data
,
0
,
writeIndex
,
"utf-8"
);
}
catch
(
UnsupportedEncodingException
e
)
{
str
=
new
String
(
data
,
0
,
length
);
}
return
str
;
}
}
finally
{
}
}
public
int
write
(
ChannelBuffer
buf
,
byte
b
)
{
buf
.
writeByte
(
b
);
return
1
;
}
public
int
write
(
ChannelBuffer
buf
,
String
str
)
{
if
(
str
==
null
)
{
str
=
"null"
;
}
byte
[]
data
=
str
.
getBytes
();
buf
.
writeBytes
(
data
);
return
data
.
length
;
}
public
int
writeRaw
(
ChannelBuffer
buf
,
String
str
)
{
if
(
str
==
null
)
{
str
=
"null"
;
}
byte
[]
data
;
try
{
data
=
str
.
getBytes
(
"utf-8"
);
}
catch
(
UnsupportedEncodingException
e
)
{
data
=
str
.
getBytes
();
}
return
m_writer
.
writeTo
(
buf
,
data
);
}
}
/**
* Thread safe date helper class. DateFormat is NOT thread safe.
*/
protected
static
class
DateHelper
{
private
BlockingQueue
<
SimpleDateFormat
>
m_formats
=
new
ArrayBlockingQueue
<
SimpleDateFormat
>(
20
);
private
Map
<
String
,
Long
>
m_map
=
new
ConcurrentHashMap
<
String
,
Long
>();
public
String
format
(
long
timestamp
)
{
SimpleDateFormat
format
=
m_formats
.
poll
();
if
(
format
==
null
)
{
format
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss.SSS"
);
format
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"GMT+8"
));
}
try
{
return
format
.
format
(
new
Date
(
timestamp
));
}
finally
{
if
(
m_formats
.
remainingCapacity
()
>
0
)
{
m_formats
.
offer
(
format
);
}
}
}
public
long
parse
(
String
str
)
{
int
len
=
str
.
length
();
String
date
=
str
.
substring
(
0
,
10
);
Long
baseline
=
m_map
.
get
(
date
);
if
(
baseline
==
null
)
{
try
{
SimpleDateFormat
format
=
new
SimpleDateFormat
(
"yyyy-MM-dd"
);
format
.
setTimeZone
(
TimeZone
.
getTimeZone
(
"GMT+8"
));
baseline
=
format
.
parse
(
date
).
getTime
();
m_map
.
put
(
date
,
baseline
);
}
catch
(
ParseException
e
)
{
return
-
1
;
}
}
long
time
=
baseline
.
longValue
();
long
metric
=
1
;
boolean
millisecond
=
true
;
for
(
int
i
=
len
-
1
;
i
>
10
;
i
--)
{
char
ch
=
str
.
charAt
(
i
);
if
(
ch
>=
'0'
&&
ch
<=
'9'
)
{
time
+=
(
ch
-
'0'
)
*
metric
;
metric
*=
10
;
}
else
if
(
millisecond
)
{
millisecond
=
false
;
}
else
{
metric
=
metric
/
100
*
60
;
}
}
return
time
;
}
}
protected
static
enum
Policy
{
DEFAULT
,
WITHOUT_STATUS
,
WITH_DURATION
;
public
static
Policy
getByMessageIdentifier
(
byte
identifier
)
{
switch
(
identifier
)
{
case
't'
:
return
WITHOUT_STATUS
;
case
'T'
:
case
'A'
:
return
WITH_DURATION
;
case
'E'
:
case
'H'
:
return
DEFAULT
;
default
:
return
DEFAULT
;
}
}
}
}
cat-client/src/main/java/com/dianping/cat/message/spi/codec/PlainTextMessageCodec.java
浏览文件 @
0762a4e6
...
@@ -98,7 +98,8 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
...
@@ -98,7 +98,8 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
}
}
}
}
protected
Message
decodeLine
(
ChannelBuffer
buf
,
DefaultTransaction
parent
,
Stack
<
DefaultTransaction
>
stack
,
MessageTree
tree
)
{
protected
Message
decodeLine
(
ChannelBuffer
buf
,
DefaultTransaction
parent
,
Stack
<
DefaultTransaction
>
stack
,
MessageTree
tree
)
{
BufferHelper
helper
=
m_bufferHelper
;
BufferHelper
helper
=
m_bufferHelper
;
byte
identifier
=
buf
.
readByte
();
byte
identifier
=
buf
.
readByte
();
String
timestamp
=
helper
.
read
(
buf
,
TAB
);
String
timestamp
=
helper
.
read
(
buf
,
TAB
);
...
@@ -217,7 +218,8 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
...
@@ -217,7 +218,8 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
return
heartbeat
;
return
heartbeat
;
}
}
default
:
default
:
m_logger
.
warn
(
"Unknown identifier("
+
(
char
)
identifier
+
") of message: "
+
buf
.
toString
(
Charset
.
forName
(
"utf-8"
)));
m_logger
.
warn
(
"Unknown identifier("
+
(
char
)
identifier
+
") of message: "
+
buf
.
toString
(
Charset
.
forName
(
"utf-8"
)));
throw
new
RuntimeException
(
"Unknown identifier int name"
);
throw
new
RuntimeException
(
"Unknown identifier int name"
);
}
}
}
}
...
@@ -401,7 +403,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
...
@@ -401,7 +403,7 @@ public class PlainTextMessageCodec implements MessageCodec, LogEnabled {
break
;
break
;
}
}
if
(
index
>
data
.
length
)
{
if
(
index
>
data
.
length
-
1
)
{
char
[]
data2
=
new
char
[
to
-
from
];
char
[]
data2
=
new
char
[
to
-
from
];
System
.
arraycopy
(
data
,
0
,
data2
,
0
,
index
);
System
.
arraycopy
(
data
,
0
,
data2
,
0
,
index
);
...
...
cat-core/src/test/java/com/dianping/cat/message/spi/core/PlainTextCodecTest.java
浏览文件 @
0762a4e6
...
@@ -13,11 +13,14 @@ import com.dianping.cat.message.Transaction;
...
@@ -13,11 +13,14 @@ import com.dianping.cat.message.Transaction;
import
com.dianping.cat.message.internal.DefaultTransaction
;
import
com.dianping.cat.message.internal.DefaultTransaction
;
import
com.dianping.cat.message.internal.MockMessageBuilder
;
import
com.dianping.cat.message.internal.MockMessageBuilder
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.codec.OldPlainTextMessageCodec
;
import
com.dianping.cat.message.spi.codec.PlainTextMessageCodec
;
import
com.dianping.cat.message.spi.codec.PlainTextMessageCodec
;
import
com.dianping.cat.message.spi.internal.DefaultMessageTree
;
import
com.dianping.cat.message.spi.internal.DefaultMessageTree
;
public
class
PlainTextCodecTest
{
public
class
PlainTextCodecTest
{
private
int
count
=
100000
;
@Test
@Test
public
void
test
()
throws
InterruptedException
{
public
void
test
()
throws
InterruptedException
{
MessageTree
tree
=
buildMessages
();
MessageTree
tree
=
buildMessages
();
...
@@ -34,22 +37,80 @@ public class PlainTextCodecTest {
...
@@ -34,22 +37,80 @@ public class PlainTextCodecTest {
Thread
.
sleep
(
1000
);
Thread
.
sleep
(
1000
);
}
}
public
void
testMany
()
throws
InterruptedException
{
MessageTree
tree
=
buildMessages
();
PlainTextMessageCodec
codec
=
new
PlainTextMessageCodec
();
ChannelBuffer
buf
=
ChannelBuffers
.
dynamicBuffer
(
8192
);
codec
.
encode
(
tree
,
buf
);
buf
.
readInt
();
buf
.
markReaderIndex
();
long
current
=
System
.
currentTimeMillis
();
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
MessageTree
tree2
=
new
DefaultMessageTree
();
codec
.
decode
(
buf
,
tree2
);
buf
.
resetReaderIndex
();
}
System
.
out
.
println
(
"Cost:"
+
(
System
.
currentTimeMillis
()
-
current
));
Thread
.
sleep
(
1000
);
}
public
void
testManyOld
()
throws
InterruptedException
{
MessageTree
tree
=
buildMessages
();
OldPlainTextMessageCodec
codec
=
new
OldPlainTextMessageCodec
();
ChannelBuffer
buf
=
ChannelBuffers
.
dynamicBuffer
(
8192
);
codec
.
encode
(
tree
,
buf
);
buf
.
readInt
();
buf
.
markReaderIndex
();
long
current
=
System
.
currentTimeMillis
();
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
MessageTree
tree2
=
new
DefaultMessageTree
();
codec
.
decode
(
buf
,
tree2
);
buf
.
resetReaderIndex
();
}
System
.
out
.
println
(
"Cost:"
+
(
System
.
currentTimeMillis
()
-
current
));
Thread
.
sleep
(
1000
);
}
public
MessageTree
buildMessages
()
{
public
MessageTree
buildMessages
()
{
Transaction
t
=
Cat
.
newTransaction
(
"type1"
,
"name1\t\n\t\n\\"
);
Transaction
t
=
Cat
.
newTransaction
(
"type1"
,
"name1\t\n\t\n\\"
);
Transaction
t2
=
Cat
.
newTransaction
(
"type2"
,
"name\t\n\t\n2\\"
);
Transaction
t2
=
Cat
.
newTransaction
(
"type2"
,
"name\t\n\t\n2\\"
);
Transaction
t3
=
Cat
.
newTransaction
(
"type3"
,
"name3\t\n\t\n\\"
);
Transaction
t3
=
Cat
.
newTransaction
(
"type3"
,
"name3\t\n\t\n\\"
);
Transaction
t4
=
Cat
.
newTransaction
(
"type4"
,
"name4\t\n\t\n\\"
);
Transaction
t4
=
Cat
.
newTransaction
(
"type4"
,
"name4\t\n\t\n\\"
);
Transaction
t5
=
Cat
.
newTransaction
(
"type4"
,
"name4\t\n\t\n\\"
);
Transaction
t6
=
Cat
.
newTransaction
(
"type4"
,
"name4\t\n\t\n\\"
);
Transaction
t7
=
Cat
.
newTransaction
(
"type4"
,
"name4\t\n\t\n\\"
);
Transaction
t8
=
Cat
.
newTransaction
(
"type4"
,
"name4\t\n\t\n\\"
);
Cat
.
logEvent
(
"type1\t\n"
,
"name\t\n"
,
"sdfsdf\t\n"
,
convertException
(
new
NullPointerException
()));
Cat
.
logEvent
(
"type1\t\n"
,
"name\t\n"
,
"sdfsdf\t\n"
,
convertException
(
new
NullPointerException
()));
Cat
.
logHeartbeat
(
"type1\t\n"
,
"name\t\n"
,
"sdfsdf\t\n"
,
convertException
(
new
NullPointerException
()));
Cat
.
logHeartbeat
(
"type1\t\n"
,
"name\t\n"
,
"sdfsdf\t\n"
,
convertException
(
new
NullPointerException
()));
Cat
.
logError
(
new
RuntimeException
());
Cat
.
logError
(
new
RuntimeException
());
for
(
int
i
=
0
;
i
<
50
;
i
++)
{
Cat
.
logEvent
(
"type1\t\n"
,
"name\t\n"
,
"sdfsdf\t\n"
,
""
);
}
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
Cat
.
logError
(
new
RuntimeException
());
}
t5
.
complete
();
t6
.
complete
();
t7
.
complete
();
t8
.
complete
();
t2
.
addData
(
convertException
(
new
NullPointerException
()));
t2
.
addData
(
convertException
(
new
NullPointerException
()));
t2
.
setStatus
(
convertException
(
new
NullPointerException
()));
t2
.
setStatus
(
convertException
(
new
NullPointerException
()));
t2
.
complete
();
t3
.
complete
();
t4
.
complete
();
t4
.
complete
();
t3
.
complete
();
t2
.
complete
();
MessageTree
tree
=
Cat
.
getManager
().
getThreadLocalMessageTree
();
MessageTree
tree
=
Cat
.
getManager
().
getThreadLocalMessageTree
();
t
.
setStatus
(
"sfsf\t\n"
);
t
.
setStatus
(
"sfsf\t\n"
);
...
...
script/Cat.sql
浏览文件 @
0762a4e6
...
@@ -5,7 +5,7 @@ use cat;
...
@@ -5,7 +5,7 @@ use cat;
CREATE
TABLE
`dailygraph`
(
CREATE
TABLE
`dailygraph`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`ip`
varchar
(
2
0
)
NULL
COMMENT
'报表来自于哪台cat-client机器ip, 空串表示合并同domain所有ip'
,
`ip`
varchar
(
5
0
)
NULL
COMMENT
'报表来自于哪台cat-client机器ip, 空串表示合并同domain所有ip'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 3/csv, 默认3'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 3/csv, 默认3'
,
...
@@ -19,7 +19,7 @@ CREATE TABLE `dailygraph` (
...
@@ -19,7 +19,7 @@ CREATE TABLE `dailygraph` (
CREATE
TABLE
`dailyreport`
(
CREATE
TABLE
`dailyreport`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称, transaction, problem...'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称, transaction, problem...'
,
`ip`
varchar
(
2
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-consumer机器'
,
`ip`
varchar
(
5
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-consumer机器'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 默认1'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 默认1'
,
...
@@ -32,7 +32,7 @@ CREATE TABLE `dailyreport` (
...
@@ -32,7 +32,7 @@ CREATE TABLE `dailyreport` (
CREATE
TABLE
`weeklyreport`
(
CREATE
TABLE
`weeklyreport`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称, transaction, problem...'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称, transaction, problem...'
,
`ip`
varchar
(
2
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-consumer机器'
,
`ip`
varchar
(
5
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-consumer机器'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 默认1'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 默认1'
,
...
@@ -45,7 +45,7 @@ CREATE TABLE `weeklyreport` (
...
@@ -45,7 +45,7 @@ CREATE TABLE `weeklyreport` (
CREATE
TABLE
`monthreport`
(
CREATE
TABLE
`monthreport`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称, transaction, problem...'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称, transaction, problem...'
,
`ip`
varchar
(
2
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-consumer机器'
,
`ip`
varchar
(
5
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-consumer机器'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 默认1'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 默认1'
,
...
@@ -58,7 +58,7 @@ CREATE TABLE `monthreport` (
...
@@ -58,7 +58,7 @@ CREATE TABLE `monthreport` (
CREATE
TABLE
`graph`
(
CREATE
TABLE
`graph`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`ip`
varchar
(
2
0
)
NULL
COMMENT
'报表来自于哪台cat-client机器ip, NULL表示合并同domain所有ip'
,
`ip`
varchar
(
5
0
)
NULL
COMMENT
'报表来自于哪台cat-client机器ip, NULL表示合并同domain所有ip'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表处理的Domain信息'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 3/csv, 默认3'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 3/csv, 默认3'
,
...
@@ -94,7 +94,7 @@ CREATE TABLE `report` (
...
@@ -94,7 +94,7 @@ CREATE TABLE `report` (
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表类型, 1/xml, 9/binary 默认1'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表类型, 1/xml, 9/binary 默认1'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`ip`
varchar
(
2
0
)
DEFAULT
NULL
COMMENT
'报表来自于哪台机器'
,
`ip`
varchar
(
5
0
)
DEFAULT
NULL
COMMENT
'报表来自于哪台机器'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表项目'
,
`domain`
varchar
(
50
)
NOT
NULL
COMMENT
'报表项目'
,
`period`
timestamp
NOT
NULL
COMMENT
'报表时间段'
,
`period`
timestamp
NOT
NULL
COMMENT
'报表时间段'
,
`content`
longtext
NULL
,
`content`
longtext
NULL
,
...
@@ -137,7 +137,7 @@ CREATE TABLE `businessReport` (
...
@@ -137,7 +137,7 @@ CREATE TABLE `businessReport` (
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表类型 报表数据格式, 1/Binary, 2/xml , 3/json'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表类型 报表数据格式, 1/Binary, 2/xml , 3/json'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`name`
varchar
(
20
)
NOT
NULL
COMMENT
'报表名称'
,
`ip`
varchar
(
2
0
)
NOT
NULL
COMMENT
'报表来自于哪台机器'
,
`ip`
varchar
(
5
0
)
NOT
NULL
COMMENT
'报表来自于哪台机器'
,
`productLine`
varchar
(
50
)
NOT
NULL
COMMENT
'指标来源于哪个产品组'
,
`productLine`
varchar
(
50
)
NOT
NULL
COMMENT
'指标来源于哪个产品组'
,
`period`
timestamp
NOT
NULL
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'报表时间段'
,
`period`
timestamp
NOT
NULL
DEFAULT
CURRENT_TIMESTAMP
ON
UPDATE
CURRENT_TIMESTAMP
COMMENT
'报表时间段'
,
`content`
longblob
COMMENT
'用于存放报表的具体内容'
,
`content`
longblob
COMMENT
'用于存放报表的具体内容'
,
...
@@ -279,7 +279,7 @@ CREATE TABLE `project` (
...
@@ -279,7 +279,7 @@ CREATE TABLE `project` (
CREATE
TABLE
`topologyGraph`
(
CREATE
TABLE
`topologyGraph`
(
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`id`
int
(
11
)
NOT
NULL
AUTO_INCREMENT
,
`ip`
varchar
(
2
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-client机器ip'
,
`ip`
varchar
(
5
0
)
NOT
NULL
COMMENT
'报表来自于哪台cat-client机器ip'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段,精确到分钟'
,
`period`
datetime
NOT
NULL
COMMENT
'报表时间段,精确到分钟'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 3/binary'
,
`type`
tinyint
(
4
)
NOT
NULL
COMMENT
'报表数据格式, 1/xml, 2/json, 3/binary'
,
`content`
longblob
COMMENT
'用于存放报表的具体内容'
,
`content`
longblob
COMMENT
'用于存放报表的具体内容'
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录