Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
梦中观雨
cat
提交
f5d66f17
C
cat
项目概览
梦中观雨
/
cat
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
cat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f5d66f17
编写于
12月 29, 2011
作者:
F
Frankie Wu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
define spi, refine api with team
上级
3863bfd8
变更
38
隐藏空白更改
内联
并排
Showing
38 changed file
with
470 addition
and
131 deletion
+470
-131
cat-core/src/main/java/com/dianping/cat/configuration/ComponentsConfigurator.java
...om/dianping/cat/configuration/ComponentsConfigurator.java
+6
-3
cat-core/src/main/java/com/dianping/cat/message/broker/DefaultMessageBroker.java
...com/dianping/cat/message/broker/DefaultMessageBroker.java
+4
-4
cat-core/src/main/java/com/dianping/cat/message/codec/MessageCodec.java
...ain/java/com/dianping/cat/message/codec/MessageCodec.java
+0
-9
cat-core/src/main/java/com/dianping/cat/message/consumer/MessageConsumer.java
...va/com/dianping/cat/message/consumer/MessageConsumer.java
+0
-7
cat-core/src/main/java/com/dianping/cat/message/consumer/MessageConsumerRegistry.java
...ianping/cat/message/consumer/MessageConsumerRegistry.java
+0
-16
cat-core/src/main/java/com/dianping/cat/message/handler/MessageHandler.java
...java/com/dianping/cat/message/handler/MessageHandler.java
+0
-7
cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java
...dianping/cat/message/internal/DefaultMessageProducer.java
+14
-1
cat-core/src/main/java/com/dianping/cat/message/internal/MessageManager.java
...ava/com/dianping/cat/message/internal/MessageManager.java
+39
-22
cat-core/src/main/java/com/dianping/cat/message/io/InMemoryQueue.java
.../main/java/com/dianping/cat/message/io/InMemoryQueue.java
+23
-11
cat-core/src/main/java/com/dianping/cat/message/io/InMemoryReceiver.java
...in/java/com/dianping/cat/message/io/InMemoryReceiver.java
+5
-5
cat-core/src/main/java/com/dianping/cat/message/io/InMemorySender.java
...main/java/com/dianping/cat/message/io/InMemorySender.java
+3
-3
cat-core/src/main/java/com/dianping/cat/message/io/MessageReceiver.java
...ain/java/com/dianping/cat/message/io/MessageReceiver.java
+1
-1
cat-core/src/main/java/com/dianping/cat/message/io/MessageSender.java
.../main/java/com/dianping/cat/message/io/MessageSender.java
+2
-2
cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketReceiver.java
...n/java/com/dianping/cat/message/io/TcpSocketReceiver.java
+5
-5
cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java
...ain/java/com/dianping/cat/message/io/TcpSocketSender.java
+5
-4
cat-core/src/main/java/com/dianping/cat/message/io/Transport.java
.../src/main/java/com/dianping/cat/message/io/Transport.java
+1
-1
cat-core/src/main/java/com/dianping/cat/message/spi/AbstractMessageAnalyzer.java
...com/dianping/cat/message/spi/AbstractMessageAnalyzer.java
+24
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageAnalyzer.java
...in/java/com/dianping/cat/message/spi/MessageAnalyzer.java
+5
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageCodec.java
.../main/java/com/dianping/cat/message/spi/MessageCodec.java
+7
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageConsumer.java
...in/java/com/dianping/cat/message/spi/MessageConsumer.java
+8
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageConsumerRegistry.java
...com/dianping/cat/message/spi/MessageConsumerRegistry.java
+11
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageFilter.java
...main/java/com/dianping/cat/message/spi/MessageFilter.java
+7
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageHandler.java
...ain/java/com/dianping/cat/message/spi/MessageHandler.java
+5
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageQueue.java
.../main/java/com/dianping/cat/message/spi/MessageQueue.java
+7
-0
cat-core/src/main/java/com/dianping/cat/message/spi/MessageTree.java
...c/main/java/com/dianping/cat/message/spi/MessageTree.java
+21
-0
cat-core/src/main/java/com/dianping/cat/message/spi/codec/PlainTextMessageCodec.java
...dianping/cat/message/spi/codec/PlainTextMessageCodec.java
+16
-0
cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageConsumerRegistry.java
.../message/spi/internal/DefaultMessageConsumerRegistry.java
+34
-0
cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageTree.java
...dianping/cat/message/spi/internal/DefaultMessageTree.java
+55
-0
cat-core/src/main/java/com/dianping/cat/message/spi/internal/MessageDispatcher.java
.../dianping/cat/message/spi/internal/MessageDispatcher.java
+43
-0
cat-core/src/main/resources/META-INF/plexus/components.xml
cat-core/src/main/resources/META-INF/plexus/components.xml
+7
-2
cat-core/src/test/java/com/dianping/cat/message/internal/MessageProducerTest.java
...om/dianping/cat/message/internal/MessageProducerTest.java
+47
-0
cat-core/src/test/java/com/dianping/cat/message/internal/MessageProducerTestConfigurator.java
...cat/message/internal/MessageProducerTestConfigurator.java
+32
-0
cat-core/src/test/java/com/dianping/cat/message/io/InMemoryTest.java
...c/test/java/com/dianping/cat/message/io/InMemoryTest.java
+5
-15
cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTest.java
.../test/java/com/dianping/cat/message/io/TcpSocketTest.java
+9
-8
cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTestConfigurator.java
...om/dianping/cat/message/io/TcpSocketTestConfigurator.java
+1
-1
cat-core/src/test/java/com/dianping/cat/message/io/TransportManagerTest.java
...ava/com/dianping/cat/message/io/TransportManagerTest.java
+1
-1
cat-core/src/test/resources/com/dianping/cat/message/internal/MessageProducerTest.xml
...com/dianping/cat/message/internal/MessageProducerTest.xml
+14
-0
cat-core/src/test/resources/com/dianping/cat/message/io/TcpSocketTest.xml
...t/resources/com/dianping/cat/message/io/TcpSocketTest.xml
+3
-3
未找到文件。
cat-core/src/main/java/com/dianping/cat/configuration/ComponentsConfigurator.java
浏览文件 @
f5d66f17
...
...
@@ -6,14 +6,16 @@ import java.util.List;
import
com.dianping.cat.message.MessageProducer
;
import
com.dianping.cat.message.broker.DefaultMessageBroker
;
import
com.dianping.cat.message.broker.MessageBroker
;
import
com.dianping.cat.message.handler.MessageDispatcher
;
import
com.dianping.cat.message.handler.MessageHandler
;
import
com.dianping.cat.message.internal.DefaultMessageProducer
;
import
com.dianping.cat.message.io.InMemoryQueue
;
import
com.dianping.cat.message.io.InMemoryReceiver
;
import
com.dianping.cat.message.io.InMemorySender
;
import
com.dianping.cat.message.io.MessageReceiver
;
import
com.dianping.cat.message.io.MessageSender
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageHandler
;
import
com.dianping.cat.message.spi.codec.PlainTextMessageCodec
;
import
com.dianping.cat.message.spi.internal.MessageDispatcher
;
import
com.site.lookup.configuration.AbstractResourceConfigurator
;
import
com.site.lookup.configuration.Component
;
...
...
@@ -21,7 +23,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public
List
<
Component
>
defineComponents
()
{
List
<
Component
>
all
=
new
ArrayList
<
Component
>();
String
inMemory
=
"in-memory"
;
all
.
add
(
C
(
InMemoryQueue
.
class
));
...
...
@@ -32,6 +33,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all
.
add
(
C
(
MessageProducer
.
class
,
DefaultMessageProducer
.
class
));
all
.
add
(
C
(
MessageCodec
.
class
,
"plain-text"
,
PlainTextMessageCodec
.
class
));
// the following are not used right now
all
.
add
(
C
(
MessageHandler
.
class
,
MessageDispatcher
.
class
)
//
.
req
(
MessageReceiver
.
class
,
inMemory
));
...
...
cat-core/src/main/java/com/dianping/cat/message/broker/DefaultMessageBroker.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.broker
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.handler.MessageHandler
;
import
com.dianping.cat.message.io.MessageReceiver
;
import
com.dianping.cat.message.io.MessageSender
;
import
com.dianping.cat.message.spi.MessageHandler
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
DefaultMessageBroker
implements
MessageBroker
{
...
...
@@ -17,8 +17,8 @@ public class DefaultMessageBroker implements MessageBroker {
public
void
run
()
{
m_reciever
.
onMessage
(
new
MessageHandler
()
{
@Override
public
void
handle
(
Message
messag
e
)
{
m_sender
.
send
(
messag
e
);
public
void
handle
(
Message
Tree
tre
e
)
{
m_sender
.
send
(
tre
e
);
}
});
}
...
...
cat-core/src/main/java/com/dianping/cat/message/codec/MessageCodec.java
已删除
100644 → 0
浏览文件 @
3863bfd8
package
com.dianping.cat.message.codec
;
import
com.dianping.cat.message.Message
;
public
interface
MessageCodec
{
public
byte
[]
encode
(
Message
message
);
public
Message
decode
(
byte
[]
data
);
}
cat-core/src/main/java/com/dianping/cat/message/consumer/MessageConsumer.java
已删除
100644 → 0
浏览文件 @
3863bfd8
package
com.dianping.cat.message.consumer
;
import
com.dianping.cat.message.Message
;
public
interface
MessageConsumer
{
public
void
consume
(
Message
message
);
}
cat-core/src/main/java/com/dianping/cat/message/consumer/MessageConsumerRegistry.java
已删除
100644 → 0
浏览文件 @
3863bfd8
package
com.dianping.cat.message.consumer
;
import
java.util.ArrayList
;
import
java.util.List
;
public
class
MessageConsumerRegistry
{
private
List
<
MessageConsumer
>
m_consumers
=
new
ArrayList
<
MessageConsumer
>();
public
List
<
MessageConsumer
>
getConsumers
()
{
return
m_consumers
;
}
public
void
register
(
MessageConsumer
consumer
)
{
m_consumers
.
add
(
consumer
);
}
}
cat-core/src/main/java/com/dianping/cat/message/handler/MessageHandler.java
已删除
100644 → 0
浏览文件 @
3863bfd8
package
com.dianping.cat.message.handler
;
import
com.dianping.cat.message.Message
;
public
interface
MessageHandler
{
public
void
handle
(
Message
message
);
}
cat-core/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.internal
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException
;
import
com.dianping.cat.message.Event
;
import
com.dianping.cat.message.Heartbeat
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.MessageProducer
;
import
com.dianping.cat.message.Transaction
;
import
com.dianping.cat.message.io.MessageSender
;
import
com.site.lookup.annotation.Inject
;
public
class
DefaultMessageProducer
implements
MessageProducer
,
Initializable
{
@Inject
private
MessageSender
m_sender
;
@Override
public
void
initialize
()
throws
InitializationException
{
MessageManager
.
INSTANCE
.
initialize
(
m_sender
);
}
public
class
DefaultMessageProducer
implements
MessageProducer
{
@Override
public
void
logEvent
(
String
type
,
String
name
,
String
status
,
String
nameValuePairs
)
{
Event
event
=
newEvent
(
type
,
name
);
...
...
cat-core/src/main/java/com/dianping/cat/message/internal/MessageManager.java
浏览文件 @
f5d66f17
...
...
@@ -6,6 +6,10 @@ import java.util.Stack;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.Transaction
;
import
com.dianping.cat.message.io.MessageSender
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.internal.DefaultMessageTree
;
import
com.site.lookup.annotation.Inject
;
public
enum
MessageManager
{
INSTANCE
;
...
...
@@ -17,42 +21,55 @@ public enum MessageManager {
}
};
@Inject
private
MessageSender
m_sender
;
private
boolean
m_initialized
;
public
void
add
(
Message
message
)
{
s_context
.
get
().
add
(
message
);
s_context
.
get
().
add
(
this
,
message
);
}
public
void
end
(
Transaction
transaction
)
{
s_context
.
get
().
end
(
transaction
);
s_context
.
get
().
end
(
t
his
,
t
ransaction
);
}
public
void
start
(
Transaction
transaction
)
{
s_context
.
get
().
start
(
transaction
);
void
flush
(
MessageTree
tree
)
{
if
(
m_sender
!=
null
)
{
m_sender
.
send
(
tree
);
// destroy current thread data
s_context
.
remove
();
}
else
if
(!
m_initialized
)
{
throw
new
IllegalStateException
(
"MessageManager is not initialized yet!"
);
}
}
void
handle
(
Transaction
transaction
)
{
// TODO
System
.
out
.
println
(
transaction
);
public
void
initialize
(
MessageSender
sender
)
{
m_sender
=
sender
;
m_initialized
=
true
;
}
public
void
start
(
Transaction
transaction
)
{
s_context
.
get
().
start
(
transaction
);
}
static
class
Context
{
private
DefaultMessageTree
m_tree
=
new
DefaultMessageTree
();
private
Stack
<
Transaction
>
m_stack
=
new
Stack
<
Transaction
>();
public
void
add
(
Message
message
)
{
if
(!
m_stack
.
isEmpty
())
{
public
void
add
(
MessageManager
manager
,
Message
message
)
{
if
(
m_stack
.
isEmpty
())
{
m_tree
.
setMessage
(
message
);
}
else
{
Transaction
entry
=
m_stack
.
peek
();
entry
.
addChild
(
message
);
}
else
{
// add a mock transaction as its parent
Transaction
t
=
new
FakeTransaction
();
start
(
t
);
t
.
addChild
(
message
);
end
(
t
);
}
}
public
void
end
(
Transaction
transaction
)
{
public
void
end
(
MessageManager
manager
,
Transaction
transaction
)
{
if
(!
m_stack
.
isEmpty
())
{
Transaction
current
=
m_stack
.
peek
();
...
...
@@ -65,7 +82,7 @@ public enum MessageManager {
m_stack
.
pop
();
if
(
m_stack
.
isEmpty
())
{
INSTANCE
.
handle
(
transaction
);
manager
.
flush
(
m_tree
);
}
}
}
...
...
@@ -111,12 +128,11 @@ public enum MessageManager {
}
@Override
public
void
complete
(
)
{
public
void
addChild
(
Message
message
)
{
}
@Override
public
long
getDuration
()
{
return
-
1
;
public
void
complete
()
{
}
@Override
...
...
@@ -125,7 +141,8 @@ public enum MessageManager {
}
@Override
public
void
addChild
(
Message
message
)
{
public
long
getDuration
()
{
return
-
1
;
}
}
}
cat-core/src/main/java/com/dianping/cat/message/io/InMemoryQueue.java
浏览文件 @
f5d66f17
...
...
@@ -9,10 +9,10 @@ import org.codehaus.plexus.logging.Logger;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable
;
import
org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException
;
import
com.dianping.cat.message.
Messag
e
;
import
com.dianping.cat.message.
spi.MessageTre
e
;
public
class
InMemoryQueue
implements
LogEnabled
,
Initializable
{
private
BlockingQueue
<
Message
>
m_queue
;
private
BlockingQueue
<
Message
Tree
>
m_queue
;
private
int
m_queueSize
;
...
...
@@ -26,30 +26,42 @@ public class InMemoryQueue implements LogEnabled, Initializable {
@Override
public
void
initialize
()
throws
InitializationException
{
if
(
m_queueSize
<=
0
)
{
m_queue
=
new
LinkedBlockingQueue
<
Message
>();
m_queue
=
new
LinkedBlockingQueue
<
Message
Tree
>();
}
else
{
m_queue
=
new
LinkedBlockingQueue
<
Message
>(
m_queueSize
);
m_queue
=
new
LinkedBlockingQueue
<
Message
Tree
>(
m_queueSize
);
}
}
public
void
offer
(
Message
messag
e
)
{
while
(!
m_queue
.
offer
(
messag
e
))
{
// throw away the
messag
e at the tail
Message
m
=
m_queue
.
poll
();
public
void
offer
(
Message
Tree
tre
e
)
{
while
(!
m_queue
.
offer
(
tre
e
))
{
// throw away the
tre
e at the tail
Message
Tree
m
=
m_queue
.
poll
();
if
(
m
==
null
)
{
break
;
}
else
{
m_logger
.
warn
(
messag
e
+
" was thrown away due to queue is full!"
);
m_logger
.
warn
(
tre
e
+
" was thrown away due to queue is full!"
);
}
}
}
public
Message
poll
(
long
timeout
)
throws
InterruptedException
{
return
m_queue
.
poll
(
timeout
,
TimeUnit
.
MILLISECONDS
);
public
MessageTree
peek
()
{
return
m_queue
.
peek
();
}
public
MessageTree
poll
(
long
timeout
)
throws
InterruptedException
{
if
(
timeout
<=
0
)
{
return
m_queue
.
poll
();
}
else
{
return
m_queue
.
poll
(
timeout
,
TimeUnit
.
MILLISECONDS
);
}
}
public
void
setQueueSize
(
int
queueSize
)
{
m_queueSize
=
queueSize
;
}
public
int
size
()
{
return
m_queue
.
size
();
}
}
cat-core/src/main/java/com/dianping/cat/message/io/InMemoryReceiver.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.io
;
import
com.dianping.cat.message.
Message
;
import
com.dianping.cat.message.
handler.MessageHandler
;
import
com.dianping.cat.message.
spi.MessageHandler
;
import
com.dianping.cat.message.
spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
InMemoryReceiver
implements
MessageReceiver
{
...
...
@@ -24,10 +24,10 @@ public class InMemoryReceiver implements MessageReceiver {
public
void
onMessage
(
MessageHandler
handler
)
{
try
{
while
(
true
)
{
Message
m
=
m_queue
.
poll
(
1
);
Message
Tree
tree
=
m_queue
.
poll
(
1
);
if
(
m
!=
null
)
{
handler
.
handle
(
m
);
if
(
tree
!=
null
)
{
handler
.
handle
(
tree
);
}
else
if
(!
isActive
())
{
break
;
}
...
...
cat-core/src/main/java/com/dianping/cat/message/io/InMemorySender.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.io
;
import
com.dianping.cat.message.
Messag
e
;
import
com.dianping.cat.message.
spi.MessageTre
e
;
import
com.site.lookup.annotation.Inject
;
public
class
InMemorySender
implements
MessageSender
{
...
...
@@ -20,9 +20,9 @@ public class InMemorySender implements MessageSender {
}
@Override
public
void
send
(
Message
messag
e
)
{
public
void
send
(
Message
Tree
tre
e
)
{
if
(
isActive
())
{
m_queue
.
offer
(
messag
e
);
m_queue
.
offer
(
tre
e
);
}
}
...
...
cat-core/src/main/java/com/dianping/cat/message/io/MessageReceiver.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.io
;
import
com.dianping.cat.message.
handler
.MessageHandler
;
import
com.dianping.cat.message.
spi
.MessageHandler
;
public
interface
MessageReceiver
{
public
void
initialize
();
...
...
cat-core/src/main/java/com/dianping/cat/message/io/MessageSender.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.io
;
import
com.dianping.cat.message.
Messag
e
;
import
com.dianping.cat.message.
spi.MessageTre
e
;
public
interface
MessageSender
{
public
void
initialize
();
public
void
send
(
Message
messag
e
);
public
void
send
(
Message
Tree
tre
e
);
public
void
shutdown
();
}
cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketReceiver.java
浏览文件 @
f5d66f17
...
...
@@ -21,9 +21,9 @@ import org.jboss.netty.channel.group.DefaultChannelGroup;
import
org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
;
import
org.jboss.netty.handler.codec.frame.FrameDecoder
;
import
com.dianping.cat.message.
Message
;
import
com.dianping.cat.message.
codec.MessageCodec
;
import
com.dianping.cat.message.
handler.MessageHandler
;
import
com.dianping.cat.message.
spi.MessageCodec
;
import
com.dianping.cat.message.
spi.MessageHandler
;
import
com.dianping.cat.message.
spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
TcpSocketReceiver
implements
MessageReceiver
{
...
...
@@ -43,9 +43,9 @@ public class TcpSocketReceiver implements MessageReceiver {
private
MessageHandler
m_messageHandler
;
void
handleMessage
(
byte
[]
data
)
{
Message
messag
e
=
m_codec
.
decode
(
data
);
Message
Tree
tre
e
=
m_codec
.
decode
(
data
);
m_messageHandler
.
handle
(
messag
e
);
m_messageHandler
.
handle
(
tre
e
);
}
@Override
...
...
cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java
浏览文件 @
f5d66f17
...
...
@@ -18,8 +18,8 @@ import org.jboss.netty.channel.MessageEvent;
import
org.jboss.netty.channel.SimpleChannelHandler
;
import
org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
;
import
com.dianping.cat.message.
Message
;
import
com.dianping.cat.message.
codec.MessageCodec
;
import
com.dianping.cat.message.
spi.MessageCodec
;
import
com.dianping.cat.message.
spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
TcpSocketSender
implements
MessageSender
{
...
...
@@ -69,14 +69,15 @@ public class TcpSocketSender implements MessageSender {
}
@Override
public
void
send
(
Message
messag
e
)
{
byte
[]
data
=
m_codec
.
encode
(
messag
e
);
public
void
send
(
Message
Tree
tre
e
)
{
byte
[]
data
=
m_codec
.
encode
(
tre
e
);
Channel
channel
=
m_future
.
getChannel
();
ChannelBufferFactory
factory
=
channel
.
getConfig
().
getBufferFactory
();
ChannelBuffer
buf
=
factory
.
getBuffer
(
data
.
length
+
4
);
buf
.
writeInt
(
data
.
length
);
buf
.
writeBytes
(
data
);
channel
.
write
(
buf
);
}
...
...
cat-core/src/main/java/com/dianping/cat/message/io/Transport.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.io
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.
handler
.MessageHandler
;
import
com.dianping.cat.message.
spi
.MessageHandler
;
/**
* <p>
...
...
cat-core/src/main/java/com/dianping/cat/message/spi/AbstractMessageAnalyzer.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
abstract
class
AbstractMessageAnalyzer
<
R
>
implements
MessageAnalyzer
{
@Override
public
void
analyze
(
MessageQueue
queue
)
{
while
(
queue
.
isActive
())
{
MessageTree
tree
=
queue
.
poll
();
if
(
tree
!=
null
)
{
process
(
tree
);
}
}
R
result
=
generate
();
store
(
result
);
}
protected
abstract
void
store
(
R
result
);
public
abstract
R
generate
();
protected
abstract
void
process
(
MessageTree
tree
);
}
cat-core/src/main/java/com/dianping/cat/message/spi/MessageAnalyzer.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
interface
MessageAnalyzer
{
public
void
analyze
(
MessageQueue
queue
);
}
cat-core/src/main/java/com/dianping/cat/message/spi/MessageCodec.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
interface
MessageCodec
{
public
byte
[]
encode
(
MessageTree
tree
);
public
MessageTree
decode
(
byte
[]
data
);
}
cat-core/src/main/java/com/dianping/cat/message/spi/MessageConsumer.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
interface
MessageConsumer
{
public
String
getId
();
public
void
consume
(
MessageTree
tree
);
}
cat-core/src/main/java/com/dianping/cat/message/spi/MessageConsumerRegistry.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
import
java.util.Map
;
public
interface
MessageConsumerRegistry
{
public
void
registerFilter
(
MessageFilter
filter
);
public
void
registerConsumer
(
MessageConsumer
consumer
);
public
Map
<
String
,
MessageConsumer
>
getConsumers
();
}
\ No newline at end of file
cat-core/src/main/java/com/dianping/cat/message/spi/MessageFilter.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
interface
MessageFilter
{
public
String
getConsumerId
();
public
boolean
doFilter
(
byte
[]
data
);
}
cat-core/src/main/java/com/dianping/cat/message/spi/MessageHandler.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
interface
MessageHandler
{
public
void
handle
(
MessageTree
message
);
}
cat-core/src/main/java/com/dianping/cat/message/spi/MessageQueue.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
interface
MessageQueue
{
public
boolean
isActive
();
public
MessageTree
poll
();
}
cat-core/src/main/java/com/dianping/cat/message/spi/MessageTree.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi
;
public
interface
MessageTree
{
public
String
getDomain
();
public
String
getHostName
();
public
String
getIpAddress
();
public
MessageTree
getMessage
();
public
String
getMessageId
();
public
int
getPort
();
public
String
getRequestToken
();
public
String
getSessionToken
();
public
String
getThreadId
();
}
cat-core/src/main/java/com/dianping/cat/message/spi/codec/PlainTextMessageCodec.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi.codec
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageTree
;
public
class
PlainTextMessageCodec
implements
MessageCodec
{
@Override
public
byte
[]
encode
(
MessageTree
tree
)
{
return
null
;
}
@Override
public
MessageTree
decode
(
byte
[]
data
)
{
return
null
;
}
}
cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageConsumerRegistry.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi.internal
;
import
java.util.LinkedHashMap
;
import
java.util.List
;
import
java.util.Map
;
import
com.dianping.cat.message.spi.MessageConsumer
;
import
com.dianping.cat.message.spi.MessageConsumerRegistry
;
import
com.dianping.cat.message.spi.MessageFilter
;
public
class
DefaultMessageConsumerRegistry
implements
MessageConsumerRegistry
{
private
Map
<
String
,
MessageConsumer
>
m_consumers
=
new
LinkedHashMap
<
String
,
MessageConsumer
>();
@Override
public
Map
<
String
,
MessageConsumer
>
getConsumers
()
{
return
m_consumers
;
}
@Override
public
void
registerConsumer
(
MessageConsumer
consumer
)
{
m_consumers
.
put
(
consumer
.
getId
(),
consumer
);
}
@Override
public
void
registerFilter
(
MessageFilter
filter
)
{
}
static
class
Entry
{
private
MessageConsumer
m_message
;
private
List
<
MessageFilter
>
m_filters
;
}
}
cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageTree.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.spi.internal
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.spi.MessageTree
;
public
class
DefaultMessageTree
implements
MessageTree
{
@Override
public
String
getDomain
()
{
return
null
;
}
@Override
public
String
getHostName
()
{
return
null
;
}
@Override
public
String
getIpAddress
()
{
return
null
;
}
@Override
public
MessageTree
getMessage
()
{
return
null
;
}
@Override
public
String
getMessageId
()
{
return
null
;
}
@Override
public
int
getPort
()
{
return
0
;
}
@Override
public
String
getRequestToken
()
{
return
null
;
}
@Override
public
String
getSessionToken
()
{
return
null
;
}
@Override
public
String
getThreadId
()
{
return
null
;
}
public
void
setMessage
(
Message
message
)
{
}
}
cat-core/src/main/java/com/dianping/cat/message/
handler
/MessageDispatcher.java
→
cat-core/src/main/java/com/dianping/cat/message/
spi/internal
/MessageDispatcher.java
浏览文件 @
f5d66f17
package
com.dianping.cat.message.
handler
;
package
com.dianping.cat.message.
spi.internal
;
import
java.util.
List
;
import
java.util.
Collection
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.consumer.MessageConsumer
;
import
com.dianping.cat.message.consumer.MessageConsumerRegistry
;
import
com.dianping.cat.message.io.MessageReceiver
;
import
com.dianping.cat.message.spi.MessageConsumer
;
import
com.dianping.cat.message.spi.MessageHandler
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.annotation.Inject
;
public
class
MessageDispatcher
implements
MessageHandler
,
Runnable
{
...
...
@@ -13,15 +13,15 @@ public class MessageDispatcher implements MessageHandler, Runnable {
private
MessageReceiver
m_receiver
;
@Inject
private
MessageConsumerRegistry
m_registry
;
private
Default
MessageConsumerRegistry
m_registry
;
@Override
public
void
handle
(
Message
messag
e
)
{
List
<
MessageConsumer
>
comsumers
=
m_registry
.
getConsumer
s
();
public
void
handle
(
Message
Tree
tre
e
)
{
Collection
<
MessageConsumer
>
comsumers
=
m_registry
.
getConsumers
().
value
s
();
for
(
MessageConsumer
consumer
:
comsumers
)
{
try
{
consumer
.
consume
(
messag
e
);
consumer
.
consume
(
tre
e
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
...
...
@@ -37,7 +37,7 @@ public class MessageDispatcher implements MessageHandler, Runnable {
m_receiver
=
receiver
;
}
public
void
setRegistry
(
MessageConsumerRegistry
registry
)
{
public
void
setRegistry
(
Default
MessageConsumerRegistry
registry
)
{
m_registry
=
registry
;
}
}
cat-core/src/main/resources/META-INF/plexus/components.xml
浏览文件 @
f5d66f17
...
...
@@ -29,8 +29,13 @@
<implementation>
com.dianping.cat.message.internal.DefaultMessageProducer
</implementation>
</component>
<component>
<role>
com.dianping.cat.message.handler.MessageHandler
</role>
<implementation>
com.dianping.cat.message.handler.MessageDispatcher
</implementation>
<role>
com.dianping.cat.message.spi.MessageCodec
</role>
<role-hint>
plain-text
</role-hint>
<implementation>
com.dianping.cat.message.spi.codec.PlainTextMessageCodec
</implementation>
</component>
<component>
<role>
com.dianping.cat.message.spi.MessageHandler
</role>
<implementation>
com.dianping.cat.message.spi.internal.MessageDispatcher
</implementation>
<requirements>
<requirement>
<role>
com.dianping.cat.message.io.MessageReceiver
</role>
...
...
cat-core/src/test/java/com/dianping/cat/message/internal/MessageProducerTest.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.internal
;
import
junit.framework.Assert
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.junit.runners.JUnit4
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.MessageProducer
;
import
com.dianping.cat.message.Transaction
;
import
com.dianping.cat.message.io.InMemoryQueue
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.site.lookup.ComponentTestCase
;
@RunWith
(
JUnit4
.
class
)
public
class
MessageProducerTest
extends
ComponentTestCase
{
@Test
public
void
testNormal
()
throws
Exception
{
MessageProducer
producer
=
lookup
(
MessageProducer
.
class
);
InMemoryQueue
queue
=
lookup
(
InMemoryQueue
.
class
);
MessageCodec
codec
=
lookup
(
MessageCodec
.
class
,
"plain-text"
);
Transaction
t
=
producer
.
newTransaction
(
"URL"
,
"MyPage"
);
try
{
// do your business here
t
.
addData
(
"k1"
,
"v1"
);
t
.
addData
(
"k2"
,
"v2"
);
t
.
addData
(
"k3"
,
"v3"
);
Thread
.
sleep
(
30
);
producer
.
logEvent
(
"URL"
,
"Payload"
,
Message
.
SUCCESS
,
"host=my-host&ip=127.0.0.1&agent=..."
);
t
.
setStatus
(
Message
.
SUCCESS
);
}
catch
(
Exception
e
)
{
t
.
setStatus
(
e
);
}
finally
{
t
.
complete
();
}
Assert
.
assertEquals
(
"One message should be in the queue."
,
1
,
queue
.
size
());
MessageTree
tree
=
queue
.
peek
();
Assert
.
assertEquals
(
"..."
,
codec
.
encode
(
tree
));
}
}
cat-core/src/test/java/com/dianping/cat/message/internal/MessageProducerTestConfigurator.java
0 → 100644
浏览文件 @
f5d66f17
package
com.dianping.cat.message.internal
;
import
java.io.File
;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.dianping.cat.message.MessageProducer
;
import
com.dianping.cat.message.io.MessageSender
;
import
com.site.lookup.configuration.AbstractResourceConfigurator
;
import
com.site.lookup.configuration.Component
;
public
class
MessageProducerTestConfigurator
extends
AbstractResourceConfigurator
{
public
static
void
main
(
String
[]
args
)
{
generatePlexusComponentsXmlFile
(
new
MessageProducerTestConfigurator
());
}
@Override
public
List
<
Component
>
defineComponents
()
{
List
<
Component
>
all
=
new
ArrayList
<
Component
>();
String
inMemory
=
"in-memory"
;
all
.
add
(
C
(
MessageProducer
.
class
,
DefaultMessageProducer
.
class
)
//
.
req
(
MessageSender
.
class
,
inMemory
));
return
all
;
}
@Override
protected
File
getConfigurationFile
()
{
return
new
File
(
"src/test/resources/"
+
MessageProducerTest
.
class
.
getName
().
replace
(
'.'
,
'/'
)
+
".xml"
);
}
}
cat-core/src/test/java/com/dianping/cat/message/io/InMemoryTest.java
浏览文件 @
f5d66f17
...
...
@@ -12,9 +12,9 @@ import org.junit.Test;
import
org.junit.runner.RunWith
;
import
org.junit.runners.JUnit4
;
import
com.dianping.cat.message.
Message
;
import
com.dianping.cat.message.
handler.MessageHandler
;
import
com.dianping.cat.message.
internal.AbstractMessag
e
;
import
com.dianping.cat.message.
spi.MessageHandler
;
import
com.dianping.cat.message.
spi.MessageTree
;
import
com.dianping.cat.message.
spi.internal.DefaultMessageTre
e
;
import
com.site.lookup.ComponentTestCase
;
@RunWith
(
JUnit4
.
class
)
...
...
@@ -41,7 +41,7 @@ public class InMemoryTest extends ComponentTestCase {
sender
.
initialize
();
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
sender
.
send
(
new
MockMessag
e
());
sender
.
send
(
new
DefaultMessageTre
e
());
}
sender
.
shutdown
();
...
...
@@ -58,16 +58,6 @@ public class InMemoryTest extends ComponentTestCase {
Assert
.
assertEquals
(
len
,
sb
.
length
());
}
static
class
MockMessage
extends
AbstractMessage
{
public
MockMessage
()
{
super
(
null
,
null
);
}
@Override
public
void
complete
()
{
}
}
static
class
MockMessageHandler
implements
MessageHandler
{
private
StringBuilder
m_sb
;
...
...
@@ -76,7 +66,7 @@ public class InMemoryTest extends ComponentTestCase {
}
@Override
public
void
handle
(
Message
messag
e
)
{
public
void
handle
(
Message
Tree
tre
e
)
{
m_sb
.
append
(
'.'
);
}
}
...
...
cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTest.java
浏览文件 @
f5d66f17
...
...
@@ -12,10 +12,11 @@ import org.junit.Test;
import
org.junit.runner.RunWith
;
import
org.junit.runners.JUnit4
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.codec.MessageCodec
;
import
com.dianping.cat.message.handler.MessageHandler
;
import
com.dianping.cat.message.internal.AbstractMessage
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.dianping.cat.message.spi.MessageHandler
;
import
com.dianping.cat.message.spi.MessageTree
;
import
com.dianping.cat.message.spi.internal.DefaultMessageTree
;
import
com.site.lookup.ComponentTestCase
;
@RunWith
(
JUnit4
.
class
)
...
...
@@ -42,7 +43,7 @@ public class TcpSocketTest extends ComponentTestCase {
sender
.
initialize
();
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
sender
.
send
(
new
MockMessag
e
());
sender
.
send
(
new
DefaultMessageTre
e
());
}
}
}));
...
...
@@ -72,12 +73,12 @@ public class TcpSocketTest extends ComponentTestCase {
public
static
class
MockMessageCodec
implements
MessageCodec
{
@Override
public
Message
decode
(
byte
[]
bytes
)
{
return
new
MockMessag
e
();
public
Message
Tree
decode
(
byte
[]
bytes
)
{
return
new
DefaultMessageTre
e
();
}
@Override
public
byte
[]
encode
(
Message
message
)
{
public
byte
[]
encode
(
Message
Tree
message
)
{
return
"mock"
.
getBytes
();
}
}
...
...
@@ -90,7 +91,7 @@ public class TcpSocketTest extends ComponentTestCase {
}
@Override
public
void
handle
(
Message
messag
e
)
{
public
void
handle
(
Message
Tree
tre
e
)
{
m_sb
.
append
(
'.'
);
}
}
...
...
cat-core/src/test/java/com/dianping/cat/message/io/TcpSocketTestConfigurator.java
浏览文件 @
f5d66f17
...
...
@@ -4,8 +4,8 @@ import java.io.File;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.dianping.cat.message.codec.MessageCodec
;
import
com.dianping.cat.message.io.TcpSocketTest.MockMessageCodec
;
import
com.dianping.cat.message.spi.MessageCodec
;
import
com.site.lookup.configuration.AbstractResourceConfigurator
;
import
com.site.lookup.configuration.Component
;
...
...
cat-core/src/test/java/com/dianping/cat/message/io/TransportManagerTest.java
浏览文件 @
f5d66f17
...
...
@@ -5,9 +5,9 @@ import junit.framework.Assert;
import
org.junit.Test
;
import
com.dianping.cat.message.Message
;
import
com.dianping.cat.message.handler.MessageHandler
;
import
com.dianping.cat.message.io.Transport
;
import
com.dianping.cat.message.io.TransportManager
;
import
com.dianping.cat.message.spi.MessageHandler
;
public
class
TransportManagerTest
{
@Test
...
...
cat-core/src/test/resources/com/dianping/cat/message/internal/MessageProducerTest.xml
0 → 100644
浏览文件 @
f5d66f17
<plexus>
<components>
<component>
<role>
com.dianping.cat.message.MessageProducer
</role>
<implementation>
com.dianping.cat.message.internal.DefaultMessageProducer
</implementation>
<requirements>
<requirement>
<role>
com.dianping.cat.message.io.MessageSender
</role>
<role-hint>
in-memory
</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
cat-core/src/test/resources/com/dianping/cat/message/io/TcpSocketTest.xml
浏览文件 @
f5d66f17
<plexus>
<components>
<component>
<role>
com.dianping.cat.message.
codec
.MessageCodec
</role>
<role>
com.dianping.cat.message.
spi
.MessageCodec
</role>
<role-hint>
tcp-socket
</role-hint>
<implementation>
com.dianping.cat.message.io.TcpSocketTest$MockMessageCodec
</implementation>
</component>
...
...
@@ -14,7 +14,7 @@
</configuration>
<requirements>
<requirement>
<role>
com.dianping.cat.message.
codec
.MessageCodec
</role>
<role>
com.dianping.cat.message.
spi
.MessageCodec
</role>
<role-hint>
tcp-socket
</role-hint>
</requirement>
</requirements>
...
...
@@ -28,7 +28,7 @@
</configuration>
<requirements>
<requirement>
<role>
com.dianping.cat.message.
codec
.MessageCodec
</role>
<role>
com.dianping.cat.message.
spi
.MessageCodec
</role>
<role-hint>
tcp-socket
</role-hint>
</requirement>
</requirements>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录