提交 1582dccb 编写于 作者: Y youyong

modify the cat home test case

......@@ -15,8 +15,8 @@
<artifactId>cat-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<scope>test</scope>
</dependency>
<dependency>
......
......@@ -6,9 +6,11 @@ import org.junit.runners.Suite.SuiteClasses;
import com.dianping.cat.consumer.failure.FailureAnalyzerStoreTest;
import com.dianping.cat.consumer.failure.FailureAnalyzerTest;
import com.dianping.cat.consumer.ip.IpAnalyzerTest;
import com.dianping.cat.consumer.failure.FailureReportTest;
import com.dianping.cat.consumer.ip.IpReportTest;
import com.dianping.cat.consumer.transaction.NumberFormatTest;
import com.dianping.cat.consumer.transaction.TransactionReportMessageAnalyzerTest;
import com.dianping.cat.consumer.transaction.TransactionReportTest;
@RunWith(Suite.class)
@SuiteClasses({
......@@ -22,13 +24,17 @@ FailureAnalyzerTest.class,
FailureAnalyzerStoreTest.class,
FailureReportTest.class,
/* .ip */
IpAnalyzerTest.class,
IpReportTest.class,
/* .transaction */
NumberFormatTest.class,
TransactionReportMessageAnalyzerTest.class
TransactionReportMessageAnalyzerTest.class,
TransactionReportTest.class
})
public class AllTests {
......
package com.dianping.cat.consumer.failure;
import org.junit.Assert;
import org.junit.Test;
import com.dianping.cat.consumer.failure.model.entity.FailureReport;
import com.dianping.cat.consumer.failure.model.transform.DefaultJsonBuilder;
import com.dianping.cat.consumer.failure.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.failure.model.transform.DefaultXmlParser;
import com.site.helper.Files;
public class FailureReportTest {
@Test
public void testXml() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("FailureReport.xml"), "utf-8");
FailureReport report = parser.parse(source);
String xml = new DefaultXmlBuilder().buildXml(report);
String expected = source;
Assert.assertEquals("XML is not well parsed!", expected.replace("\r", ""), xml.replace("\r", ""));
}
@Test
public void testJson() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("FailureReport.xml"), "utf-8");
FailureReport report = parser.parse(source);
String json = new DefaultJsonBuilder().buildJson(report);
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("FailureReport.json"), "utf-8");
Assert.assertEquals("XML is not well parsed or JSON is not well built!", expected.replace("\r", ""),
json.replace("\r", ""));
}
}
......@@ -4,16 +4,33 @@ import org.junit.Assert;
import org.junit.Test;
import com.dianping.cat.consumer.ip.model.entity.IpReport;
import com.dianping.cat.consumer.ip.model.transform.DefaultJsonBuilder;
import com.dianping.cat.consumer.ip.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.ip.model.transform.DefaultXmlParser;
import com.site.helper.Files;
public class IpAnalyzerTest {
public class IpReportTest {
@Test
public void testXml() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("ip.xml"), "utf-8");
IpReport report = parser.parse(expected);
String source = Files.forIO().readFrom(getClass().getResourceAsStream("IpReport.xml"), "utf-8");
IpReport report = parser.parse(source);
String xml = new DefaultXmlBuilder().buildXml(report);
String expected = source;
Assert.assertEquals("XML is not well parsed!", expected.replace("\r", ""), report.toString().replace("\r", ""));
Assert.assertEquals("XML is not well parsed!", expected.replace("\r", ""), xml.replace("\r", ""));
}
@Test
public void testJson() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("IpReport.xml"), "utf-8");
IpReport report = parser.parse(source);
String json = new DefaultJsonBuilder().buildJson(report);
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("IpReport.json"), "utf-8");
Assert.assertEquals("XML is not well parsed or JSON is not well built!", expected.replace("\r", ""),
json.replace("\r", ""));
}
}
package com.dianping.cat.consumer.transaction;
import org.junit.Assert;
import org.junit.Test;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultJsonBuilder;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.site.helper.Files;
public class TransactionReportTest {
/**
* <range minute="5" count="123" sum="123456" avg="22.2" transactions="3"/>
* <range minute="10" count="123" sum="12457" avg="222" transactions="3"/>
* <duration value="128" count="34"/> <duration value="256" count="12"/>
*/
@Test
public void testXml() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("TransactionReport.xml"), "utf-8");
TransactionReport report = parser.parse(source);
String xml = new DefaultXmlBuilder().buildXml(report);
String expected = source;
Assert.assertEquals("XML is not well parsed!", expected.replace("\r", ""), xml.replace("\r", ""));
}
@Test
public void testJson() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("TransactionReport.xml"), "utf-8");
TransactionReport report = parser.parse(source);
String json = new DefaultJsonBuilder().buildJson(report);
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("TransactionReport.json"), "utf-8");
Assert.assertEquals("XML is not well parsed or JSON is not well built!", expected.replace("\r", ""),
json.replace("\r", ""));
}
}
{
"domain": "Review",
"machine": "192.168.32.68",
"startTime": "2012-02-16 22:00:00",
"endTime": "2012-02-16 22:59:00",
"threads": {
"threads": [
"main",
"main2"
]
},
"segments": {
"2012-02-16 22:00": {
"id": "2012-02-16 22:00",
"entries": [
{
"type": "Error",
"path": "20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html",
"threadId": "main",
"text": "java.lang.OutOfMemoryError"
}
]
},
"2012-02-16 22:01": {
"id": "2012-02-16 22:01",
"entries": [
{
"type": "Error",
"path": "20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html",
"threadId": "main2",
"text": "java.lang.OutOfMemoryError"
}
]
},
"2012-02-16 22:02": {
"id": "2012-02-16 22:02"
},
"2012-02-16 22:03": {
"id": "2012-02-16 22:03"
},
"2012-02-16 22:46": {
"id": "2012-02-16 22:46"
},
"2012-02-16 22:47": {
"id": "2012-02-16 22:47"
},
"2012-02-16 22:48": {
"id": "2012-02-16 22:48",
"entries": [
{
"type": "Error",
"path": "20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html",
"threadId": "main",
"text": "java.lang.OutOfMemoryError"
},
{
"type": "Exception",
"path": "20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html",
"threadId": "main",
"text": "java.lang.Exception"
},
{
"type": "RuntimeException",
"path": "20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html",
"threadId": "main",
"text": "java.lang.RuntimeException"
},
{
"type": "Exception",
"path": "20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html",
"threadId": "main2",
"text": "java.lang.Exception"
},
{
"type": "RuntimeException",
"path": "20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html",
"threadId": "main2",
"text": "java.lang.NullPointerException"
}
]
},
"2012-02-16 22:49": {
"id": "2012-02-16 22:49"
},
"2012-02-16 22:50": {
"id": "2012-02-16 22:50"
}
}
}
<failure-report domain="Review" machine="192.168.32.68" startTime="2012-02-16 22:00:00" endTime="2012-02-16 22:59:00">
<threads>
<thread>main</thread>
<thread>main2</thread>
</threads>
<segment id="2012-02-16 22:00">
<entry type="Error" path="20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html" threadId="main">java.lang.OutOfMemoryError</entry>
</segment>
<segment id="2012-02-16 22:01">
<entry type="Error" path="20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html" threadId="main2">java.lang.OutOfMemoryError</entry>
</segment>
<segment id="2012-02-16 22:02">
</segment>
<segment id="2012-02-16 22:03">
</segment>
<segment id="2012-02-16 22:46">
</segment>
<segment id="2012-02-16 22:47">
</segment>
<segment id="2012-02-16 22:48">
<entry type="Error" path="20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html" threadId="main">java.lang.OutOfMemoryError</entry>
<entry type="Exception" path="20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html" threadId="main">java.lang.Exception</entry>
<entry type="RuntimeException" path="20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html" threadId="main">java.lang.RuntimeException</entry>
<entry type="Exception" path="20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html" threadId="main2">java.lang.Exception</entry>
<entry type="RuntimeException" path="20120216/22/Review/d872b43b-acde-40c4-a02b-c0df1fc88c97.html" threadId="main2">java.lang.NullPointerException</entry>
</segment>
<segment id="2012-02-16 22:49">
</segment>
<segment id="2012-02-16 22:50">
</segment>
</failure-report>
{
"domain": "Review",
"startTime": "2012-01-25 13:00:00",
"endTime": "2012-01-25 13:59:00",
"periods": {
"0": {
"minute": 0,
"ips": {
"127.0.0.1": {
"address": "127.0.0.1",
"count": 19
},
"192.168.63.30": {
"address": "192.168.63.30",
"count": 11
}
}
},
"1": {
"minute": 1
}
}
}
{
"domain": "Cat",
"startTime": "2012-02-16 23:00:00",
"endTime": "2012-02-16 23:59:00",
"domains": [
"Cat"
],
"types": {
"URL": {
"id": "URL",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html",
"totalCount": 11,
"failCount": 0,
"failPercent": "0.00",
"min": 0.0,
"max": 194.0,
"avg": "47.1",
"sum": 518.0,
"sum2": 73942.0,
"std": "67.1",
"names": {
"home": {
"id": "home",
"totalCount": 1,
"failCount": 0,
"failPercent": "0.00",
"min": 175.0,
"max": 175.0,
"avg": "175.0",
"sum": 175.0,
"sum2": 30625.0,
"std": "0.0",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html"
},
"service": {
"id": "service",
"totalCount": 8,
"failCount": 0,
"failPercent": "0.00",
"min": 1.0,
"max": 58.0,
"avg": "13.0",
"sum": 104.0,
"sum2": 3952.0,
"std": "18.0",
"successMessageUrl": "20120216/23/Cat/b10bdefb-1eca-45e1-a9c3-52367078b5a2.html"
},
"t": {
"id": "t",
"totalCount": 1,
"failCount": 0,
"failPercent": "0.00",
"min": 193.0,
"max": 193.0,
"avg": "193.0",
"sum": 193.0,
"sum2": 37249.0,
"std": "0.0",
"successMessageUrl": "20120216/23/Cat/8e7c91a0-7549-4b13-b43b-252b2a6ef4bb.html"
},
"ip": {
"id": "ip",
"totalCount": 1,
"failCount": 0,
"failPercent": "0.00",
"min": 46.0,
"max": 46.0,
"avg": "46.0",
"sum": 46.0,
"sum2": 2116.0,
"std": "0.0",
"successMessageUrl": "20120216/23/Cat/2029c32e-b692-4e43-8eaf-96b8d6c846a2.html"
}
}
},
"MVC": {
"id": "MVC",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html",
"totalCount": 33,
"failCount": 0,
"failPercent": "0.00",
"min": 0.0,
"max": 191.0,
"avg": "15.1",
"sum": 499.0,
"sum2": 68377.0,
"std": "42.9",
"names": {
"InboundPhase": {
"id": "InboundPhase",
"totalCount": 11,
"failCount": 0,
"failPercent": "0.00",
"min": 0.0,
"max": 17.0,
"avg": "1.5",
"sum": 17.0,
"sum2": 289.0,
"std": "4.9",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html"
},
"TransitionPhase": {
"id": "TransitionPhase",
"totalCount": 11,
"failCount": 0,
"failPercent": "0.00",
"min": 0.0,
"max": 4.9E-324,
"avg": "0.0",
"sum": 0.0,
"sum2": 0.0,
"std": "0.0",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html"
},
"OutboundPhase": {
"id": "OutboundPhase",
"totalCount": 11,
"failCount": 0,
"failPercent": "0.00",
"min": 1.0,
"max": 191.0,
"avg": "43.8",
"sum": 482.0,
"sum2": 68088.0,
"std": "65.3",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html"
}
}
},
"NEW1": {
"id": "NEW1",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html",
"totalCount": 33,
"failCount": 0,
"failPercent": "0.00",
"min": 0.0,
"max": 191.0,
"avg": "15.1",
"sum": 499.0,
"sum2": 68377.0,
"std": "42.9",
"names": {
"InboundPhase": {
"id": "InboundPhase",
"totalCount": 11,
"failCount": 0,
"failPercent": "0.00",
"min": 0.0,
"max": 17.0,
"avg": "1.5",
"sum": 17.0,
"sum2": 289.0,
"std": "4.9",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html"
},
"TransitionPhase": {
"id": "TransitionPhase",
"totalCount": 11,
"failCount": 0,
"failPercent": "0.00",
"min": 0.0,
"max": 4.9E-324,
"avg": "0.0",
"sum": 0.0,
"sum2": 0.0,
"std": "0.0",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html"
},
"OutboundPhase": {
"id": "OutboundPhase",
"totalCount": 11,
"failCount": 0,
"failPercent": "0.00",
"min": 1.0,
"max": 191.0,
"avg": "43.8",
"sum": 482.0,
"sum2": 68088.0,
"std": "65.3",
"successMessageUrl": "20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html"
}
}
}
}
}
<transaction-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="194.0" avg="47.1" sum="518.0" sum2="73942.0" std="67.1">
<name id="home" totalCount="1" failCount="0" failPercent="0.00" min="175.0" max="175.0" avg="175.0" sum="175.0" sum2="30625.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="service" totalCount="8" failCount="0" failPercent="0.00" min="1.0" max="58.0" avg="13.0" sum="104.0" sum2="3952.0" std="18.0">
<successMessageUrl>20120216/23/Cat/b10bdefb-1eca-45e1-a9c3-52367078b5a2.html</successMessageUrl>
</name>
<name id="t" totalCount="1" failCount="0" failPercent="0.00" min="193.0" max="193.0" avg="193.0" sum="193.0" sum2="37249.0" std="0.0">
<successMessageUrl>20120216/23/Cat/8e7c91a0-7549-4b13-b43b-252b2a6ef4bb.html</successMessageUrl>
</name>
<name id="ip" totalCount="1" failCount="0" failPercent="0.00" min="46.0" max="46.0" avg="46.0" sum="46.0" sum2="2116.0" std="0.0">
<successMessageUrl>20120216/23/Cat/2029c32e-b692-4e43-8eaf-96b8d6c846a2.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="MVC" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<name id="InboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="17.0" avg="1.5" sum="17.0" sum2="289.0" std="4.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="TransitionPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="OutboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="482.0" sum2="68088.0" std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="NEW1" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<name id="InboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="17.0" avg="1.5" sum="17.0" sum2="289.0" std="4.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="TransitionPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="OutboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="482.0" sum2="68088.0" std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
</transaction-report>
......@@ -19,8 +19,8 @@
<artifactId>netty</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<scope>test</scope>
</dependency>
<dependency>
......
......@@ -57,6 +57,10 @@ public class Cat {
public static MessageProducer getProducer() {
return getInstance().m_producer;
}
public static MessageManager getManager() {
return getInstance().m_manager;
}
// this should be called during application initialization time
public static void initialize(File configFile) {
......@@ -132,8 +136,8 @@ public class Cat {
// this should be called when a thread starts to create some thread local
// data
public static void setup(String sessionToken, String requestToken) {
getInstance().m_manager.setup(sessionToken, requestToken);
public static void setup(String sessionToken) {
getInstance().m_manager.setup();
}
void setContainer(PlexusContainer container) {
......
......@@ -22,6 +22,7 @@ import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.BufferWriter;
import com.dianping.cat.message.spi.codec.EscapingBufferWriter;
import com.dianping.cat.message.spi.codec.HtmlEncodingBufferWriter;
......@@ -33,6 +34,13 @@ import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageHandler;
import com.dianping.cat.message.spi.internal.DefaultMessagePathBuilder;
import com.dianping.cat.message.spi.internal.DefaultMessageStorage;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketFactory;
import com.dianping.cat.storage.BucketManager;
import com.dianping.cat.storage.internal.DefaultBucket;
import com.dianping.cat.storage.internal.DefaultBucketFactory;
import com.dianping.cat.storage.internal.DefaultBucketManager;
import com.dianping.cat.storage.internal.DefaultMessageBucket;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -83,6 +91,13 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageManager.class, MessageConsumerRegistry.class));
all.add(C(Bucket.class, String.class.getName(), DefaultBucket.class));
all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class));
all.add(C(Bucket.class, MessageTree.class.getName(), DefaultMessageBucket.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class));
all.add(C(BucketFactory.class, DefaultBucketFactory.class));
return all;
}
......
......@@ -95,6 +95,11 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
return m_serverConfig;
}
@Override
public MessageTree getThreadLocalMessageTree() {
return getContext().m_tree;
}
@Override
public void initializeClient(Config clientConfig) {
if (clientConfig != null) {
......@@ -132,8 +137,8 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
@Override
public void setup(String sessionToken, String requestToken) {
Context ctx = new Context(m_domain, m_hostName, m_ipAddress, sessionToken, requestToken);
public void setup() {
Context ctx = new Context(m_domain, m_hostName, m_ipAddress);
m_context.set(ctx);
}
......@@ -153,19 +158,16 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
private Stack<Transaction> m_stack;
public Context(String domain, String hostName, String ipAddress, String sessionToken, String requestToken) {
public Context(String domain, String hostName, String ipAddress) {
m_tree = new DefaultMessageTree();
m_stack = new Stack<Transaction>();
m_tree.setDomain(domain);
m_tree.setSessionToken(sessionToken);
m_tree.setRequestToken(requestToken);
Thread thread = Thread.currentThread();
m_tree.setThreadId(Long.toHexString(thread.getId()));
m_tree.setThreadId(thread.getName());
m_tree.setDomain(domain);
m_tree.setHostName(hostName);
m_tree.setIpAddress(ipAddress);
}
......@@ -175,7 +177,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
MessageTree tree = m_tree.copy();
tree.setMessage(message);
tree.setMessageId(UUID.randomUUID().toString());
tree.setMessageId(createMessageId());
manager.flush(tree);
} else {
Transaction entry = m_stack.peek();
......@@ -184,6 +186,10 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
}
String createMessageId() {
return UUID.randomUUID().toString();
}
public void end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.peek();
......@@ -200,7 +206,6 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
MessageTree tree = m_tree.copy();
m_tree.setMessage(null);
tree.setMessageId(UUID.randomUUID().toString());
manager.flush(tree);
}
}
......@@ -212,6 +217,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
entry.addChild(transaction);
} else {
m_tree.setMessageId(createMessageId());
m_tree.setMessage(transaction);
}
......
......@@ -4,23 +4,80 @@ import com.dianping.cat.configuration.model.entity.Config;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
/**
* Message manager to help build CAT message.
* <p>
*
* Notes: This method is reserved for internal usage only. Application developer
* should never call this method directly.
*/
public interface MessageManager {
public void add(Message message);
/**
* Be triggered when a transaction ends, whatever it's the root transaction
* or nested transaction. However, if it's the root transaction then it will
* be flushed to back-end CAT server asynchronously.
* <p>
*
* @param transaction
*/
public void end(Transaction transaction);
/**
* Return configuration for CAT client.
*
* @return CAT configuration
*/
public Config getClientConfig();
/**
* Return configuration for CAT client.
*
* @return CAT configuration
*/
public Config getServerConfig();
/**
* Initialize CAT client with given CAT configuration.
*
* @param config
* CAT configuration
*/
public void initializeClient(Config config);
/**
* Initialize CAT server with given CAT configuration.
*
* @param config
* CAT configuration
*/
public void initializeServer(Config config);
/**
* Do cleanup for current thread environment in order to release resources in
* thread local objects.
*/
public void reset();
public void setup(String sessionToken, String requestToken);
/**
* Do setup for current thread environment in order to prepare thread local
* objects.
*/
public void setup();
/**
* Be triggered when a new transaction starts, whatever it's the root
* transaction or nested transaction.
*
* @param transaction
*/
public void start(Transaction transaction);
/**
* Get thread local message information.
*
* @return message tree
*/
public MessageTree getThreadLocalMessageTree();
}
\ No newline at end of file
......@@ -15,7 +15,9 @@ public interface MessageTree extends Cloneable {
public String getMessageId();
public String getRequestToken();
public String getParentMessageId();
public String getRootMessageId();
public String getSessionToken();
......@@ -33,7 +35,9 @@ public interface MessageTree extends Cloneable {
public void setMessageId(String messageId);
public void setRequestToken(String requestToken);
public void setParentMessageId(String parentMessageId);
public void setRootMessageId(String rootMessageId);
public void setSessionToken(String sessionToken);
......
......@@ -2,7 +2,6 @@ package com.dianping.cat.message.spi.codec;
import org.jboss.netty.buffer.ChannelBuffer;
public class EscapingBufferWriter implements BufferWriter {
@Override
public int writeTo(ChannelBuffer buf, byte[] data) {
......
......@@ -20,7 +20,8 @@ import com.dianping.cat.message.spi.StringRope;
import com.site.lookup.annotation.Inject;
/**
* Local use only, do not use it over network since it only supports one-way encoding
* Local use only, do not use it over network since it only supports one-way
* encoding
*/
public class HtmlMessageCodec implements MessageCodec {
private static final String ID = "HT1"; // HTML version 1
......@@ -28,6 +29,9 @@ public class HtmlMessageCodec implements MessageCodec {
@Inject
private BufferWriter m_writer;
@Inject
private String m_logViewPrefix;
private BufferHelper m_bufferHelper = new BufferHelper(m_writer);
private DateHelper m_dateHelper = new DateHelper();
......@@ -69,7 +73,8 @@ public class HtmlMessageCodec implements MessageCodec {
count += helper.td(buf, tree.getThreadId());
count += helper.td(buf, tree.getThreadName());
count += helper.td(buf, tree.getMessageId());
count += helper.td(buf, tree.getRequestToken());
count += helper.td(buf, tree.getParentMessageId());
count += helper.td(buf, tree.getRootMessageId());
count += helper.td(buf, tree.getSessionToken());
count += helper.tr2(buf);
count += helper.crlf(buf);
......@@ -111,7 +116,7 @@ public class HtmlMessageCodec implements MessageCodec {
if (Message.SUCCESS.equals(message.getStatus())) {
count += helper.td(buf, message.getStatus());
} else {
count += helper.td(buf, message.getStatus(), "error");
count += helper.td(buf, message.getStatus(), "class=\"error\"");
}
Object data = message.getData();
......@@ -145,9 +150,45 @@ public class HtmlMessageCodec implements MessageCodec {
return count;
}
protected int encodeLogViewLink(Message message, ChannelBuffer buf, int level, LineCounter counter) {
BufferHelper helper = m_bufferHelper;
int count = 0;
if (counter != null) {
counter.inc();
count += helper.tr1(buf, counter.getCount() % 2 != 0 ? "odd" : "even");
} else {
count += helper.tr1(buf, null);
}
String link = message.getData().toString();
int id = Math.abs(link.hashCode());
count += helper.td1(buf);
count += helper.nbsp(buf, level * 2); // 2 spaces per level
count += helper.write(buf, String.format("<a href=\"%s%s\" onclick=\"show(%s);return false;\">[:: show ::]</a>",
m_logViewPrefix, link, id));
count += helper.td2(buf);
count += helper.td(buf, "", "colspan=\"4\" id=\"" + id + "\"");
count += helper.tr2(buf);
count += helper.crlf(buf);
return count;
}
public int encodeMessage(Message message, ChannelBuffer buf, int level, LineCounter counter) {
if (message instanceof Event) {
return encodeLine(message, buf, 'E', Policy.DEFAULT, level, counter);
String type = message.getType();
if ("RemoteCall".equals(type)) {
return encodeLogViewLink(message, buf, level, counter);
} else {
return encodeLine(message, buf, 'E', Policy.DEFAULT, level, counter);
}
} else if (message instanceof Transaction) {
Transaction transaction = (Transaction) message;
List<Message> children = transaction.getChildren();
......@@ -183,6 +224,10 @@ public class HtmlMessageCodec implements MessageCodec {
m_bufferHelper = new BufferHelper(m_writer);
}
public void setLogViewPrefix(String logViewPrefix) {
m_logViewPrefix = logViewPrefix;
}
protected static class BufferHelper {
private static byte[] TABLE1 = "<table class=\"logview\">".getBytes();
......@@ -233,7 +278,7 @@ public class HtmlMessageCodec implements MessageCodec {
return td(buf, str, null);
}
public int td(ChannelBuffer buf, String str, String styleClass) {
public int td(ChannelBuffer buf, String str, String attributes) {
if (str == null) {
str = "null";
}
......@@ -241,11 +286,11 @@ public class HtmlMessageCodec implements MessageCodec {
byte[] data = str.getBytes();
int count = 0;
if (styleClass == null) {
if (attributes == null) {
buf.writeBytes(TD1);
count += TD1.length;
} else {
String tag = "<td class=\"" + styleClass + "\">";
String tag = "<td " + attributes + ">";
byte[] bytes = tag.getBytes();
buf.writeBytes(bytes);
......@@ -266,6 +311,19 @@ public class HtmlMessageCodec implements MessageCodec {
return TD1.length;
}
public int td1(ChannelBuffer buf, String attributes) {
if (attributes == null) {
buf.writeBytes(TD1);
return TD1.length;
} else {
String tag = "<td " + attributes + ">";
byte[] bytes = tag.getBytes();
buf.writeBytes(bytes);
return bytes.length;
}
}
public int td2(ChannelBuffer buf) {
buf.writeBytes(TD2);
return TD2.length;
......
......@@ -55,7 +55,8 @@ public class PlainTextMessageCodec implements MessageCodec {
String threadId = helper.read(buf, TAB);
String threadName = helper.read(buf, TAB);
String messageId = helper.read(buf, TAB);
String requestToken = helper.read(buf, TAB);
String parentMessageId = helper.read(buf, TAB);
String rootMessageId = helper.read(buf, TAB);
String sessionToken = helper.read(buf, LF);
if (ID.equals(id)) {
......@@ -65,7 +66,8 @@ public class PlainTextMessageCodec implements MessageCodec {
tree.setThreadId(threadId);
tree.setThreadName(threadName);
tree.setMessageId(messageId);
tree.setRequestToken(requestToken);
tree.setParentMessageId(parentMessageId);
tree.setRootMessageId(rootMessageId);
tree.setSessionToken(sessionToken);
} else {
throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id));
......@@ -214,7 +216,9 @@ public class PlainTextMessageCodec implements MessageCodec {
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getMessageId());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getRequestToken());
count += helper.write(buf, tree.getParentMessageId());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getRootMessageId());
count += helper.write(buf, TAB);
count += helper.write(buf, tree.getSessionToken());
count += helper.write(buf, LF);
......
......@@ -18,7 +18,9 @@ public class DefaultMessageTree implements MessageTree {
private String m_messageId;
private String m_requestToken;
private String m_parentMessageId;
private String m_rootMessageId;
private String m_sessionToken;
......@@ -31,20 +33,41 @@ public class DefaultMessageTree implements MessageTree {
@Override
public DefaultMessageTree copy() {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setDomain(m_domain);
tree.setHostName(m_hostName);
tree.setIpAddress(m_ipAddress);
tree.setMessageId(m_messageId);
tree.setRequestToken(m_requestToken);
tree.setParentMessageId(m_parentMessageId);
tree.setRootMessageId(m_rootMessageId);
tree.setSessionToken(m_sessionToken);
tree.setThreadId(m_threadId);
tree.setThreadName(m_threadName);
tree.setMessage(m_message);
return tree;
}
@Override
public String getParentMessageId() {
return m_parentMessageId;
}
@Override
public void setParentMessageId(String parentMessageId) {
m_parentMessageId = parentMessageId;
}
@Override
public String getRootMessageId() {
return m_rootMessageId;
}
@Override
public void setRootMessageId(String rootMessageId) {
m_rootMessageId = rootMessageId;
}
@Override
public String getDomain() {
return m_domain;
......@@ -70,11 +93,6 @@ public class DefaultMessageTree implements MessageTree {
return m_messageId;
}
@Override
public String getRequestToken() {
return m_requestToken;
}
@Override
public String getSessionToken() {
return m_sessionToken;
......@@ -114,11 +132,6 @@ public class DefaultMessageTree implements MessageTree {
m_messageId = messageId;
}
@Override
public void setRequestToken(String requestToken) {
m_requestToken = requestToken;
}
@Override
public void setSessionToken(String sessionToken) {
m_sessionToken = sessionToken;
......
......@@ -25,12 +25,10 @@ public abstract class CatFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException,
ServletException {
HttpServletRequest req = (HttpServletRequest) request;
String requestToken = getRequestToken();
String sessionToken = req.getSession().getId();
// setup for thread local data
Cat.setup(sessionToken, requestToken);
Cat.setup(sessionToken);
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("URL", req.getRequestURI());
......
......@@ -25,6 +25,6 @@ public class CatListener implements ServletContextListener {
Cat.initialize(new File(catClientXml));
//for background thread
Cat.setup(null,null);
Cat.setup(null);
}
}
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.List;
public interface Bucket<T> extends TagThreadSupport<T> {
public void close();
public List<T> findAllByIds(List<String> ids);
public T findById(String id);
public void initialize(Class<?> type, String path) throws IOException;
public boolean storeById(String id, T data);
}
package com.dianping.cat.storage;
public interface BucketFactory<T> {
public Bucket<T> create(String path);
}
\ No newline at end of file
package com.dianping.cat.storage;
import java.io.IOException;
import com.dianping.cat.message.spi.MessageTree;
public interface BucketManager {
public Bucket<MessageTree> getMessageBucket(String path) throws IOException;
public Bucket<String> getStringBucket(String path) throws IOException;
public Bucket<byte[]> getBytesBucket(String path) throws IOException;
}
package com.dianping.cat.storage;
import java.util.List;
/**
* Map to one HDFS directory for one report.
* <p>
*
* Sample tags: "thread:101", "session:abc", "request:xyz", "parent:xxx"
*/
public interface TagThreadSupport<T> {
public boolean storeById(String id, T data, String... tags);
public List<String> findAllIdsByTag(String tag);
public T findNextById(String id, Direction direction, String tag);
public static enum Direction {
FORWARD,
BACKWARD;
}
}
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.TagThreadSupport;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSupport<T>, LogEnabled {
private static final String[] EMPTY = new String[0];
@Inject
private String m_baseDir;
// key => offset of record
private Map<String, Long> m_idToOffsets = new HashMap<String, Long>();
// tag => list of ids
private Map<String, List<String>> m_tagToIds = new HashMap<String, List<String>>();
private File m_file;
private RandomAccessFile m_readFile;
private RandomAccessFile m_writeFile;
private ReentrantLock m_readLock;
private ReentrantLock m_writeLock;
private Logger m_logger;
@Override
public void close() {
m_writeLock.lock();
try {
m_writeFile.close();
m_idToOffsets.clear();
m_tagToIds.clear();
} catch (IOException e) {
// ignore it
} finally {
m_writeLock.unlock();
}
}
protected abstract T decode(ChannelBuffer buf) throws IOException;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
protected abstract void encode(T data, ChannelBuffer buf) throws IOException;
@Override
public List<T> findAllByIds(List<String> ids) {
List<T> list = new ArrayList<T>(ids.size());
for (String id : ids) {
list.add(findById(id));
}
return list;
}
@Override
public List<String> findAllIdsByTag(String tag) {
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
return Collections.emptyList();
} else {
return ids;
}
}
@Override
public T findById(String id) {
Long offset = m_idToOffsets.get(id);
if (offset != null) {
m_readLock.lock();
try {
long old = m_readFile.getFilePointer();
m_readFile.seek(offset);
m_readFile.readLine(); // first line is header, get rid of it
int num = Integer.parseInt(m_readFile.readLine());
byte[] bytes = new byte[num];
m_readFile.readFully(bytes);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bytes);
T data = decode(buf);
m_readFile.seek(old);
return data;
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_file), e);
} finally {
m_readLock.unlock();
}
}
return null;
}
@Override
public T findNextById(String id, Direction direction, String tag) {
List<String> ids = m_tagToIds.get(tag);
if (ids != null) {
int index = ids.indexOf(id);
switch (direction) {
case FORWARD:
index++;
break;
case BACKWARD:
index--;
break;
}
if (index >= 0 && index < ids.size()) {
String nextId = ids.get(index);
return findById(nextId);
}
}
return null;
}
@Override
public void initialize(Class<?> type, String path) throws IOException {
m_writeLock = new ReentrantLock();
m_readLock = new ReentrantLock();
m_file = new File(m_baseDir, path);
m_file.getParentFile().mkdirs();
m_writeFile = new RandomAccessFile(m_file, "rw");
m_readFile = new RandomAccessFile(m_file, "r");
if (m_file.exists()) {
loadIndexes();
}
}
protected void loadIndexes() throws IOException {
byte[] data = new byte[8192];
m_writeLock.lock();
try {
while (true) {
long offset = m_writeFile.getFilePointer();
String first = m_writeFile.readLine();
if (first == null) { // EOF
break;
}
int num = Integer.parseInt(m_writeFile.readLine());
if (num > data.length) {
int newSize = data.length;
while (newSize < num) {
newSize += newSize / 2;
}
data = new byte[newSize];
}
m_writeFile.readFully(data, 0, num); // get rid of it
m_writeFile.readLine(); // get rid of empty line
List<String> parts = Splitters.by('\t').split(first);
if (parts.size() > 0) {
String id = parts.get(0);
parts.remove(0);
updateIndex(id, parts.toArray(EMPTY), offset);
}
}
} finally {
m_writeLock.unlock();
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
@Override
public boolean storeById(String id, T data) {
return storeById(id, data, EMPTY);
}
/**
* Store the message in the format of:<br>
*
* <xmp> <id>\t<tag1>\t<tag2>\t...\n <length of message>\n <message>\n </xmp>
*/
@Override
public boolean storeById(String id, T data, String... tags) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n";
byte[] first;
byte[] num;
int length;
try {
encode(data, buf);
length = buf.readInt();
first = attributes.getBytes("utf-8");
num = String.valueOf(length).getBytes("utf-8");
} catch (Exception e) {
m_logger.error(String.format("Error when preparing to write to file(%s)!", m_file), e);
return false;
}
m_writeLock.lock();
try {
long offset = m_writeFile.getFilePointer();
m_writeFile.write(first);
m_writeFile.write(num);
m_writeFile.write('\n');
m_writeFile.write(buf.array(), buf.readerIndex(), length);
m_writeFile.write('\n');
// TODO add a flag
m_writeFile.getChannel().force(true);
updateIndex(id, tags, offset);
return true;
} catch (Exception e) {
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
return false;
} finally {
m_writeLock.unlock();
}
}
protected void updateIndex(String id, String[] tags, long offset) {
m_idToOffsets.put(id, offset);
for (String tag : tags) {
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
ids = new ArrayList<String>();
m_tagToIds.put(tag, ids);
}
ids.add(id);
}
}
}
package com.dianping.cat.storage.internal;
import java.io.IOException;
import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
public class DefaultBucket<T> extends AbstractFileBucket<T> {
private Class<?> m_type;
@SuppressWarnings("unchecked")
@Override
protected T decode(ChannelBuffer buf) throws IOException {
if (m_type == String.class) {
return (T) buf.toString(buf.readerIndex(), buf.readableBytes(), Charset.forName("utf-8"));
} else if (m_type == byte[].class) {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
return (T) bytes;
} else {
throw new UnsupportedOperationException(String.format(
"Only String or byte[] are supported so far, but was %s.", m_type));
}
}
@Override
protected void encode(T data, ChannelBuffer buf) throws IOException {
if (m_type == String.class) {
String str = (String) data;
byte[] bytes = str.getBytes("utf-8");
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
} else if (m_type == byte[].class) {
byte[] bytes = (byte[]) data;
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
} else {
throw new UnsupportedOperationException(String.format(
"Only String or byte[] are supported so far, but was %s.", m_type));
}
}
@Override
public void initialize(Class<?> type, String path) throws IOException {
super.initialize(type, path);
m_type = type;
if (m_type != String.class && m_type != byte[].class) {
throw new UnsupportedOperationException(String.format(
"Only String or byte[] are supported so far, but was %s.", m_type));
}
}
}
package com.dianping.cat.storage.internal;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketFactory;
public class DefaultBucketFactory<T> implements BucketFactory<T> {
@Override
public Bucket<T> create(String path) {
// TODO Auto-generated method stub
return null;
}
}
package com.dianping.cat.storage.internal;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.ContainerHolder;
public class DefaultBucketManager extends ContainerHolder implements BucketManager, Disposable {
private Map<Entry, Bucket<?>> m_map = new HashMap<Entry, Bucket<?>>();
protected Bucket<?> createBucket(String path, Class<?> type) throws IOException {
Bucket<?> bucket = lookup(Bucket.class, type.getName());
bucket.initialize(type, path);
return bucket;
}
@Override
public void dispose() {
for (Bucket<?> bucket : m_map.values()) {
release(bucket);
}
}
@SuppressWarnings("unchecked")
protected <T> Bucket<T> getBucket(Class<T> type, String path) throws IOException {
if (type == null || path == null) {
throw new IllegalArgumentException(String.format("Type(%s) or path(%s) can't be null.", type, path));
}
Entry entry = new Entry(type, path);
Bucket<?> bucket = m_map.get(entry);
if (bucket == null) {
synchronized (this) {
bucket = m_map.get(entry);
if (bucket == null) {
bucket = createBucket(path, type);
m_map.put(entry, bucket);
}
}
}
return (Bucket<T>) bucket;
}
@Override
public Bucket<byte[]> getBytesBucket(String path) throws IOException {
return getBucket(byte[].class, path);
}
@Override
public Bucket<MessageTree> getMessageBucket(String path) throws IOException {
return getBucket(MessageTree.class, path);
}
@Override
public Bucket<String> getStringBucket(String path) throws IOException {
return getBucket(String.class, path);
}
static class Entry {
private Class<?> m_type;
private String m_path;
public Entry(Class<?> type, String path) {
m_type = type;
m_path = path;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Entry) {
Entry e = (Entry) obj;
return e.getClass() == m_type && e.getPath().equals(m_path);
}
return false;
}
public String getPath() {
return m_path;
}
public Class<?> getType() {
return m_type;
}
@Override
public int hashCode() {
int hashcode = m_type.hashCode();
hashcode = hashcode * 31 + m_path.hashCode();
return hashcode;
}
}
}
package com.dianping.cat.storage.internal;
import java.io.IOException;
import org.jboss.netty.buffer.ChannelBuffer;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.annotation.Inject;
public class DefaultMessageBucket extends AbstractFileBucket<MessageTree> {
@Inject
private MessageCodec m_codec;
@Override
protected MessageTree decode(ChannelBuffer buf) throws IOException {
MessageTree tree = new DefaultMessageTree();
m_codec.decode(buf, tree);
return tree;
}
@Override
protected void encode(MessageTree tree, ChannelBuffer buf) throws IOException {
m_codec.encode(tree, buf);
}
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
}
......@@ -169,5 +169,34 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>[B</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultMessageBucket</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.BucketManager</role>
<implementation>com.dianping.cat.storage.internal.DefaultBucketManager</implementation>
</component>
<component>
<role>com.dianping.cat.storage.BucketFactory</role>
<implementation>com.dianping.cat.storage.internal.DefaultBucketFactory</implementation>
</component>
</components>
</plexus>
......@@ -10,7 +10,7 @@ public abstract class CatTestCase extends ComponentTestCase {
@Before
public void before() throws Exception {
Cat.initialize(getContainer(), null);
Cat.setup(null, null);
Cat.setup(null);
}
@After
......
......@@ -25,6 +25,7 @@ public class HtmlMessageCodecTest {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
codec.setBufferWriter(new HtmlEncodingBufferWriter());
codec.setLogViewPrefix("/cat/m/");
codec.encodeMessage(message, buf, 0, null);
String actual = buf.toString(Charset.forName("utf-8"));
......@@ -68,7 +69,8 @@ public class HtmlMessageCodecTest {
tree.setHostName("hostName");
tree.setIpAddress("ipAddress");
tree.setMessageId("messageId");
tree.setRequestToken("requestToken");
tree.setParentMessageId("parentMessageId");
tree.setRootMessageId("rootMessageId");
tree.setSessionToken("sessionToken");
tree.setThreadId("threadId");
tree.setThreadName("threadName");
......@@ -164,6 +166,35 @@ public class HtmlMessageCodecTest {
+ "<tr><td>T15:33:42.087</td><td>URL</td><td>Review</td><td>0</td><td>100ms /review/2468</td></tr>\r\n");
}
@Test
public void testTransactionWithRemoteCall() {
long timestamp = 1325489621987L;
Transaction root = newTransaction("URL", "Review", timestamp, "0", 100, "/review/2468");
root.addChild(newEvent("URL", "Payload", timestamp, "0", "ip=127.0.0.1&ua=Mozilla 5.0...&refer=...&..."));
root.addChild(newTransaction("Service", "Auth", timestamp, "0", 20, "userId=1357&token=..."));
root.addChild(newTransaction("Cache", "findReviewByPK", timestamp + 22, "Missing", 1, "2468") //
.addChild(newEvent("CacheHost", "host-1", timestamp + 22, "0", "ip=192.168.8.123")));
root.addChild(newEvent("Service", "ReviewService", timestamp + 23, "0", "message_id"));
root.addChild(newEvent("RemoteCall", "Pigeon", timestamp + 23, "0", "message_id"));
root.addChild(newTransaction("DAL", "findReviewByPK", timestamp + 25, "0", 5,
"select title,content from Review where id = ?"));
root.addChild(newEvent("URL", "View", timestamp + 40, "0", "view=HTML"));
check(root,
"<tr><td>t15:33:41.987</td><td>URL</td><td>Review</td><td></td><td></td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;E15:33:41.987</td><td>URL</td><td>Payload</td><td>0</td><td>ip=127.0.0.1&amp;ua=Mozilla 5.0...&amp;refer=...&amp;...</td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;A15:33:41.987</td><td>Service</td><td>Auth</td><td>0</td><td>20ms userId=1357&amp;token=...</td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;t15:33:42.009</td><td>Cache</td><td>findReviewByPK</td><td></td><td></td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;&nbsp;&nbsp;E15:33:42.009</td><td>CacheHost</td><td>host-1</td><td>0</td><td>ip=192.168.8.123</td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;T15:33:42.010</td><td>Cache</td><td>findReviewByPK</td><td class=\"error\">Missing</td><td>1ms 2468</td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;E15:33:42.010</td><td>Service</td><td>ReviewService</td><td>0</td><td>message_id</td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;<a href=\"/cat/m/message_id\" onclick=\"show(1690722221);return false;\">[:: show ::]</a></td><td colspan=\"4\" id=\"1690722221\"></td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;A15:33:42.012</td><td>DAL</td><td>findReviewByPK</td><td>0</td><td>5ms select title,content from Review where id = ?</td></tr>\r\n"
+ "<tr><td>&nbsp;&nbsp;E15:33:42.027</td><td>URL</td><td>View</td><td>0</td><td>view=HTML</td></tr>\r\n"
+ "<tr><td>T15:33:42.087</td><td>URL</td><td>Review</td><td>0</td><td>100ms /review/2468</td></tr>\r\n");
}
@Test
public void testTransactionSimple() {
long timestamp = 1325489621987L;
......
......@@ -78,7 +78,8 @@ public class PlainTextMessageCodecTest {
tree.setHostName("hostName");
tree.setIpAddress("ipAddress");
tree.setMessageId("messageId");
tree.setRequestToken("requestToken");
tree.setParentMessageId("parentMessageId");
tree.setRootMessageId("rootMessageId");
tree.setSessionToken("sessionToken");
tree.setThreadId("threadId");
tree.setThreadName("threadName");
......@@ -128,7 +129,7 @@ public class PlainTextMessageCodecTest {
public void testMessageTree() {
DefaultMessageTree tree = newMessageTree();
long timestamp = 1325489621987L;
String expected = "PT1\tdomain\thostName\tipAddress\tthreadId\tthreadName\tmessageId\trequestToken\tsessionToken\n";
String expected = "PT1\tdomain\thostName\tipAddress\tthreadId\tthreadName\tmessageId\tparentMessageId\trootMessageId\tsessionToken\n";
checkTree(tree, expected);
......
package com.dianping.cat.storage;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class BucketPerfTest extends ComponentTestCase {
private int perfTimes = 100000;
/**
* Test method for {@link com.dianping.tkv.TkvImpl#get(java.lang.String)}.
*
* @throws IOException
*/
@Test
public void testPutDiffKeyWithoutTagsPerf() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<byte[]> bucket = manager.getBytesBucket("target/bucket/perf1");
String value = "0123456789";
long start = System.currentTimeMillis();
for (int i = 0; i < perfTimes; i++) {
bucket.storeById(String.valueOf(10000000 + i), value.getBytes());
}
System.out.println("testPutDiffKeyWithoutTagsPerf:" + (System.currentTimeMillis() - start));
}
@Test
public void testGetDiffKeyWithoutTagsPerf() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<byte[]> bucket = manager.getBytesBucket("target/bucket/perf2");
String value = "0123456789";
for (int i = 0; i < perfTimes; i++) {
bucket.storeById(String.valueOf(10000000 + i), value.getBytes());
}
long start = System.currentTimeMillis();
for (int i = 0; i < perfTimes; i++) {
bucket.findById(String.valueOf(10000000 + i));
}
System.out.println("testGetDiffKeyWithoutTagsPerf:" + (System.currentTimeMillis() - start));
}
@Test
public void testGetTagRecordPerf() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<byte[]> bucket = manager.getBytesBucket("target/bucket/perf2");
String value = "0123456789";
for (int i = 0; i < perfTimes; i++) {
bucket.storeById(String.valueOf(10000000 + i), value.getBytes(), "pet" + (i % 100));
}
long start = System.currentTimeMillis();
for (int i = 0; i < perfTimes; i++) {
bucket.findNextById(String.valueOf(10000000 + i), Direction.FORWARD, "pet" + (i % 100));
}
System.out.println("testGetTagRecordPerf:" + (System.currentTimeMillis() - start));
}
}
package com.dianping.cat.storage;
import junit.framework.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class BucketTest extends ComponentTestCase {
private DefaultMessageTree newMessageTree(String id) {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setDomain("domain");
tree.setHostName("hostName");
tree.setIpAddress("ipAddress");
tree.setMessageId(id);
tree.setParentMessageId("parentMessageId");
tree.setRootMessageId("rootMessageId");
tree.setSessionToken("sessionToken");
tree.setThreadId("threadId");
tree.setThreadName("threadName");
return tree;
}
@Test
public void testBytesBucket() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<byte[]> bucket = manager.getBytesBucket("target/bucket/bytes");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
boolean success = bucket.storeById(id, t1.getBytes());
if (success) {
String t2 = new String(bucket.findById(id));
Assert.assertEquals("Unable to find data after stored it.", t1, t2);
} else {
Assert.fail("Data failed to store at i=" + i + ".");
}
}
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(byte[].class, "target/bucket/bytes");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
String t2 = new String(bucket.findById(id));
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
}
@Test
public void testMessageBucket() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<MessageTree> bucket = manager.getMessageBucket("target/bucket/message");
int groups = 10;
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
MessageTree t1 = newMessageTree(id);
boolean success = bucket.storeById(id, t1, "r:" + (i % groups));
if (success) {
MessageTree t2 = bucket.findById(id);
Assert.assertEquals("Unable to find message after stored it.", t1.toString(), t2.toString());
} else {
Assert.fail("Message failed to store at i=" + i + ".");
}
}
// check next message in the same thread
for (int i = 0; i < groups - 1; i++) {
String id = "id" + (i * groups + i);
String nextId = "id" + ((i + 1) * groups + i);
String tag = "r:" + i;
MessageTree t1 = bucket.findNextById(id, Direction.FORWARD, tag);
MessageTree t2 = bucket.findById(nextId);
Assert.assertEquals("Unable to find next message in the thread " + i + ".", t1.toString(), t2.toString());
}
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(MessageTree.class, "target/bucket/message");
// check next message in the same thread
for (int i = 0; i < groups - 1; i++) {
String id = "id" + (i * groups + i);
String nextId = "id" + ((i + 1) * groups + i);
String tag = "r:" + i;
MessageTree t1 = bucket.findNextById(id, Direction.FORWARD, tag);
MessageTree t2 = bucket.findById(nextId);
Assert.assertEquals("Unable to find next message in the thread " + i + ".", t1.toString(), t2.toString());
}
}
@Test
public void testStringBucket() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<String> bucket = manager.getStringBucket("target/bucket/data");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
boolean success = bucket.storeById(id, t1);
if (success) {
String t2 = bucket.findById(id);
Assert.assertEquals("Unable to find data after stored it.", t1, t2);
} else {
Assert.fail("Data failed to store at i=" + i + ".");
}
}
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(String.class, "target/bucket/data");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
String t2 = bucket.findById(id);
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
}
}
......@@ -15,6 +15,10 @@
<groupId>com.dianping.cat</groupId>
<artifactId>cat-consumer</artifactId>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-job</artifactId>
</dependency>
<dependency>
<groupId>com.site.dal</groupId>
<artifactId>dal-xml</artifactId>
......
......@@ -11,8 +11,7 @@ import com.dianping.cat.report.tool.Constants;
import com.dianping.cat.report.tool.DateUtils;
import com.site.helper.Files;
public class ReportManager {
public abstract class ReportManager {
public String getRemotePageContent(String urlStr) {
try {
URL url = new URL(urlStr);
......
......@@ -24,7 +24,7 @@ public class ComponentsConfigurator extends AbstractWebComponentsConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
if (isEnv("dev")) {
if (isEnv("dev") || property("env", null) == null) {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime", "dump-to-html" }, "m_consumers"));
} else {
......@@ -33,24 +33,25 @@ public class ComponentsConfigurator extends AbstractWebComponentsConfigurator {
}
all.add(C(ServerConfig.class)//
.config(E("consumerServers").value("127.0.0.1:2281,127.0.0.1:2281"))//
.config(E("fileServer").value("127.0.0.1")));
.config(E("consumerServers").value("127.0.0.1:2281,127.0.0.1:2281"))//
.config(E("fileServer").value("127.0.0.1")));
all.add(C(ModelProvider.class,"failure",FailureModelProvider.class).req(MessageConsumer.class,"realtime"));
all.add(C(ModelProvider.class,"transaction",TransactionModelProvider.class).req(MessageConsumer.class,"realtime"));
all.add(C(ModelProvider.class,"ip",IpModelProvider.class).req(MessageConsumer.class,"realtime"));
all.add(C(ModelProvider.class, "failure", FailureModelProvider.class).req(MessageConsumer.class, "realtime"));
all.add(C(ModelProvider.class, "transaction", TransactionModelProvider.class).req(MessageConsumer.class,
"realtime"));
all.add(C(ModelProvider.class, "ip", IpModelProvider.class).req(MessageConsumer.class, "realtime"));
all.add(C(FailureManager.class));
all.add(C(TransactionManager.class));
all.add(C(IpManager.class));
//LAST
// Please keep it last
defineModuleRegistry(all, ReportModule.class, ReportModule.class);
return all;
}
......
package com.dianping.cat.report.page.failure;
import com.dianping.cat.consumer.failure.model.entity.Entry;
import com.dianping.cat.consumer.failure.model.entity.FailureReport;
import com.dianping.cat.consumer.failure.model.entity.Threads;
import com.dianping.cat.consumer.failure.model.transform.DefaultMerger;
public class FailureReportMerger extends DefaultMerger {
public FailureReportMerger(FailureReport failureReport) {
super(failureReport);
}
@Override
protected void mergeEntry(Entry old, Entry entry) {
// TODO Auto-generated method stub
super.mergeEntry(old, entry);
}
@Override
protected void mergeThreads(Threads old, Threads threads) {
old.getThreads().addAll(threads.getThreads());
}
}
package com.dianping.cat.report.page.transaction;
import java.util.List;
import com.dianping.cat.consumer.transaction.model.entity.TransactionName;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.entity.TransactionType;
import com.dianping.cat.consumer.transaction.model.transform.DefaultMerger;
public class TransactionReportMerger extends DefaultMerger {
public TransactionReportMerger(TransactionReport transactionReport) {
super(transactionReport);
}
public TransactionReport mergesFrom(TransactionReport report) {
report.accept(this);
return getTransactionReport();
}
public static TransactionReport merges(List<TransactionReport> reports) {
TransactionReportMerger merger = new TransactionReportMerger(new TransactionReport(""));
for (TransactionReport report : reports) {
report.accept(merger);
}
return merger.getTransactionReport();
}
@Override
protected void mergeName(TransactionName old, TransactionName other) {
old.setTotalCount(old.getTotalCount() + other.getTotalCount());
old.setFailCount(old.getFailCount() + other.getFailCount());
if (other.getMin() < old.getMin()) {
old.setMin(other.getMin());
}
if (other.getMax() > old.getMax()) {
old.setMax(other.getMax());
}
old.setSum(old.getSum() + other.getSum());
old.setSum2(old.getSum2() + other.getSum2());
if (old.getTotalCount() > 0) {
old.setFailPercent(old.getFailCount() * 100.0 / old.getTotalCount());
old.setAvg(old.getSum() / old.getTotalCount());
old.setStd(std(old.getTotalCount(), old.getAvg(), old.getSum2()));
}
}
@Override
protected void mergeType(TransactionType old, TransactionType other) {
old.setTotalCount(old.getTotalCount() + other.getTotalCount());
old.setFailCount(old.getFailCount() + other.getFailCount());
if (other.getMin() < old.getMin()) {
old.setMin(other.getMin());
}
if (other.getMax() > old.getMax()) {
old.setMax(other.getMax());
}
old.setSum(old.getSum() + other.getSum());
old.setSum2(old.getSum2() + other.getSum2());
if (old.getTotalCount() > 0) {
old.setFailPercent(old.getFailCount() * 100.0 / old.getTotalCount());
old.setAvg(old.getSum() / old.getTotalCount());
old.setStd(std(old.getTotalCount(), old.getAvg(), old.getSum2()));
}
}
protected double std(long count, double ave, double sum2) {
return Math.sqrt(sum2 / count - 2 * ave * ave + ave * ave);
}
}
......@@ -52,27 +52,26 @@ public class ReportUtils {
public static void mergeTransactionReport(TransactionReport targetReport, TransactionReport mergeReport) {
mergeReport.accept(new com.dianping.cat.consumer.transaction.model.transform.DefaultMerger(targetReport) {
private double std(long count, double ave, double sum2) {
return Math.sqrt(sum2 / count - 2 * ave * ave + ave * ave);
}
@Override
protected void mergeName(TransactionName old, TransactionName name) {
if(old.getId()==null){
if (old.getId() == null) {
System.out.println("TransactionName old is null");
old = name;
}
else if(name.getId()==null){
} else if (name.getId() == null) {
System.out.println("TransactionName new is null");
return;
}
// TODO Auto-generated method stub
//super.mergeName(old, name);
// super.mergeName(old, name);
old.setTotalCount(old.getTotalCount() + name.getTotalCount());
old.setFailCount(old.getFailCount() + name.getFailCount());
old.setFailPercent(old.getFailCount()* 100.0 / old.getTotalCount());
old.setFailPercent(old.getFailCount() * 100.0 / old.getTotalCount());
double min = name.getMin();
if (min < old.getMin()) {
old.setMin(min);
......@@ -83,11 +82,11 @@ public class ReportUtils {
old.setMax(max);
}
old.setSum(old.getSum()+name.getSum());
old.setAvg((double)old.getSum()/(double)old.getTotalCount());
old.setSum2(old.getSum2()+name.getSum2());
old.setSum(old.getSum() + name.getSum());
old.setAvg((double) old.getSum() / (double) old.getTotalCount());
old.setSum2(old.getSum2() + name.getSum2());
double std = std(old.getTotalCount(), old.getAvg(), old.getSum2());
old.setStd(std);
}
......@@ -96,26 +95,24 @@ public class ReportUtils {
protected void mergeTransactionReport(TransactionReport old, TransactionReport transactionReport) {
// TODO Auto-generated method stub
super.mergeTransactionReport(old, transactionReport);
}
@Override
protected void mergeType(TransactionType old, TransactionType name) {
if(old.getId()==null){
if (old.getId() == null) {
System.out.println("TransactionType old is null");
old = name;
}
else if(name.getId()==null){
} else if (name.getId() == null) {
System.out.println("TransactionType new is null");
return;
}
// TODO Auto-generated method stub
//super.mergeType(old, type);
// super.mergeType(old, type);
old.setTotalCount(old.getTotalCount() + name.getTotalCount());
old.setFailCount(old.getFailCount() + name.getFailCount());
old.setFailPercent( old.getFailCount()* 100.0 / old.getTotalCount());
old.setFailPercent(old.getFailCount() * 100.0 / old.getTotalCount());
double min = name.getMin();
if (min < old.getMin()) {
old.setMin(min);
......@@ -126,11 +123,11 @@ public class ReportUtils {
old.setMax(max);
}
old.setSum(old.getSum()+name.getSum());
old.setAvg((double)old.getSum()/(double)old.getTotalCount());
old.setSum2(old.getSum2()+name.getSum2());
old.setSum(old.getSum() + name.getSum());
old.setAvg((double) old.getSum() / (double) old.getTotalCount());
old.setSum2(old.getSum2() + name.getSum2());
double std = std(old.getTotalCount(), old.getAvg(), old.getSum2());
old.setStd(std);
}
......
......@@ -8,6 +8,7 @@
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hints>
<role-hint>realtime</role-hint>
<role-hint>dump-to-html</role-hint>
</role-hints>
<field-name>m_consumers</field-name>
</requirement>
......
......@@ -51,6 +51,13 @@
<tag-class>org.unidal.webres.taglib.basic.UseCssTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[Set the css value with EL or a css ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the link URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -72,13 +79,6 @@
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[Set the css value with EL or a css ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Target placement for this css resource to render]]></description>
<name>target</name>
<required>false</required>
......@@ -114,18 +114,18 @@
<tag-class>org.unidal.webres.taglib.basic.SetTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[The name.]]></description>
<name>id</name>
<description><![CDATA[The value]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.String</type>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[The value]]></description>
<name>value</name>
<description><![CDATA[The name.]]></description>
<name>id</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
<type>java.lang.String</type>
</attribute>
<dynamic-attributes>false</dynamic-attributes>
</tag>
......@@ -135,6 +135,13 @@
<tag-class>org.unidal.webres.taglib.basic.UseJsTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[Set the js value with EL or a js ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the link URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -156,13 +163,6 @@
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[Set the js value with EL or a js ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Target placement for this js resource to render]]></description>
<name>target</name>
<required>false</required>
......@@ -177,6 +177,13 @@
<tag-class>org.unidal.webres.taglib.basic.LinkTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[The value for link, could be a expression or a link ref.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the link URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -190,13 +197,6 @@
<rtexprvalue>true</rtexprvalue>
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[The value for link, could be a expression or a link ref.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<dynamic-attributes>true</dynamic-attributes>
</tag>
<tag>
......@@ -205,6 +205,13 @@
<tag-class>org.unidal.webres.taglib.basic.ImageTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[The value for image, could be a expression or a image path.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the image URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -225,13 +232,6 @@
<rtexprvalue>true</rtexprvalue>
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[The value for image, could be a expression or a image path.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<dynamic-attributes>true</dynamic-attributes>
</tag>
<tag>
......
......@@ -5,9 +5,9 @@ import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import com.dianping.cat.report.page.ip.DisplayModelTest;
import com.dianping.cat.report.page.transaction.TransactionReportMergerTest;
import com.dianping.cat.report.tool.FailureReportToolTest;
import com.dianping.cat.report.tool.IpReportToolTest;
import com.dianping.cat.report.tool.TransactionReportToolTest;
@RunWith(Suite.class)
@SuiteClasses({
......@@ -20,7 +20,7 @@ FailureReportToolTest.class,
IpReportToolTest.class,
TransactionReportToolTest.class
TransactionReportMergerTest.class
})
public class AllTests {
......
......@@ -27,7 +27,7 @@ public class Demo extends ComponentTestCase {
Cat.initialize(getContainer(), configFile);
}
Cat.setup(null, null);
Cat.setup(null);
}
@After
......
package com.dianping.cat.report.page.transaction;
import org.junit.Assert;
import org.junit.Test;
import org.unidal.webres.helper.Files;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.transaction.TransactionReportMerger;
public class TransactionReportMergerTest {
@Test
public void testTransactionReportMerge() throws Exception {
String oldXml = Files.forIO().readFrom(getClass().getResourceAsStream("TransactionReportOld.xml"), "utf-8");
String newXml = Files.forIO().readFrom(getClass().getResourceAsStream("TransactionReportNew.xml"), "utf-8");
TransactionReport reportOld = new DefaultXmlParser().parse(oldXml);
TransactionReport reportNew = new DefaultXmlParser().parse(newXml);
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("TransactionReportMergeResult.xml"),
"utf-8");
TransactionReportMerger merger = new TransactionReportMerger(reportOld);
merger.mergesFrom(reportNew);
String actual = new DefaultXmlBuilder().buildXml(reportOld);
Assert.assertEquals("Check the merge result!", expected.replace("\r", ""), actual.replace("\r", ""));
}
}
......@@ -9,17 +9,18 @@ import com.dianping.cat.consumer.failure.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.failure.model.transform.DefaultXmlParser;
public class FailureReportToolTest {
@Test
public void testFailureReportMerge() throws Exception{
String oldXml = Files.forIO().readFrom(FailureReportToolTest.class.getResourceAsStream("FailureReportOld.xml"),"utf-8");
String newXml = Files.forIO().readFrom(FailureReportToolTest.class.getResourceAsStream("FailureReportNew.xml"),"utf-8");
public void testFailureReportMerge() throws Exception {
String oldXml = Files.forIO().readFrom(getClass().getResourceAsStream("FailureReportOld.xml"), "utf-8");
String newXml = Files.forIO().readFrom(getClass().getResourceAsStream("FailureReportNew.xml"), "utf-8");
FailureReport reportOld = new DefaultXmlParser().parse(oldXml);
FailureReport reportNew = new DefaultXmlParser().parse(newXml);
String result = Files.forIO().readFrom(FailureReportToolTest.class.getResourceAsStream("FailureReportMergeResult.xml"),"utf-8");
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("FailureReportMergeResult.xml"), "utf-8");
ReportUtils.mergeFailureReport(reportOld, reportNew);
Assert.assertEquals("Chech the merage result!",result,new DefaultXmlBuilder().buildXml(reportOld));
String actual = new DefaultXmlBuilder().buildXml(reportOld);
Assert.assertEquals("Chech the merage result!", expected.replace("\r", ""), actual.replace("\r", ""));
}
}
......@@ -9,17 +9,18 @@ import com.dianping.cat.consumer.ip.model.transform.DefaultXmlParser;
import com.dianping.cat.consumer.ip.model.entity.IpReport;
public class IpReportToolTest {
@Test
public void testIpReportMerge() throws Exception{
String oldXml = Files.forIO().readFrom(IpReportToolTest.class.getResourceAsStream("IpReportOld.xml"),"utf-8");
String newXml = Files.forIO().readFrom(IpReportToolTest.class.getResourceAsStream("IpReportNew.xml"),"utf-8");
public void testIpReportMerge() throws Exception {
String oldXml = Files.forIO().readFrom(getClass().getResourceAsStream("IpReportOld.xml"), "utf-8");
String newXml = Files.forIO().readFrom(getClass().getResourceAsStream("IpReportNew.xml"), "utf-8");
IpReport reportOld = new DefaultXmlParser().parse(oldXml);
IpReport reportNew = new DefaultXmlParser().parse(newXml);
String result = Files.forIO().readFrom(IpReportToolTest.class.getResourceAsStream("IpReportMergeResult.xml"),"utf-8");
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("IpReportMergeResult.xml"), "utf-8");
ReportUtils.mergeIpReport(reportOld, reportNew);
Assert.assertEquals("Chech the merage result!",result,new DefaultXmlBuilder().buildXml(reportOld));
String actual = new DefaultXmlBuilder().buildXml(reportOld);
Assert.assertEquals("Chech the merage result!", expected.replace("\r", ""), actual.replace("\r", ""));
}
}
<transaction-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<<<<<<< HEAD:cat-home/src/test/resources/com/dianping/cat/report/tool/TransactionReportMergeResult.xml
<type id="URL" totalCount="22" failCount="0" failPercent="0.00" min="0.0" max="194.0" avg="47.1" sum="1036.0" sum2="147884.0" std="67.1">
=======
<type id="URL" totalCount="22" failCount="4" failPercent="18.18" min="0.0" max="194.0" avg="47.1" sum="1036.0" sum2="147884.0" std="67.1">
>>>>>>> 4eeaaceb4535ac3dd3133176a1c2222e14bc15e0:cat-home/src/test/resources/com/dianping/cat/report/page/transaction/TransactionReportMergeResult.xml
<name id="home" totalCount="2" failCount="0" failPercent="0.00" min="175.0" max="175.0" avg="175.0" sum="350.0" sum2="61250.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
</successMessageUrl>
......@@ -25,6 +29,7 @@
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
</successMessageUrl>
</name>
<<<<<<< HEAD:cat-home/src/test/resources/com/dianping/cat/report/tool/TransactionReportMergeResult.xml
<name id="TransitionPhase" totalCount="22" failCount="0" failPercent="0.00" min="0.0" max="1234.0" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
</successMessageUrl>
......@@ -32,6 +37,11 @@
<name id="OutboundPhase" totalCount="22" failCount="0" failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="964.0" sum2="136176.0" std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
</successMessageUrl>
=======
<name id="TransitionPhase" totalCount="22" failCount="4" failPercent="18.18" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
</name>
<name id="OutboundPhase" totalCount="22" failCount="7" failPercent="31.82" min="1.0" max="191.0" avg="43.8" sum="964.0" sum2="136176.0" std="65.3">
>>>>>>> 4eeaaceb4535ac3dd3133176a1c2222e14bc15e0:cat-home/src/test/resources/com/dianping/cat/report/page/transaction/TransactionReportMergeResult.xml
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
</successMessageUrl>
......
<transaction-report domain="Cat" startTime="2012-02-16 23:00:00"
endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00"
<type id="URL" totalCount="11" failCount="4" failPercent="0.00"
min="1.0" max="193.0" avg="47.1" sum="518.0" sum2="73942.0" std="67.1">
<name id="home" totalCount="1" failCount="0" failPercent="0.00"
min="175.0" max="175.0" avg="175.0" sum="175.0" sum2="30625.0" std="0.0">
......@@ -34,13 +34,18 @@
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
</successMessageUrl>
</name>
<<<<<<< HEAD:cat-home/src/test/resources/com/dianping/cat/report/tool/TransactionReportOld.xml
<name id="TransitionPhase" totalCount="11" failCount="0"
failPercent="0.00" min="0.0" max="1234" avg="0.0" sum="0.0" sum2="0.0"
=======
<name id="TransitionPhase" totalCount="11" failCount="4"
failPercent="0.00" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0"
>>>>>>> 4eeaaceb4535ac3dd3133176a1c2222e14bc15e0:cat-home/src/test/resources/com/dianping/cat/report/page/transaction/TransactionReportOld.xml
std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
</successMessageUrl>
</name>
<name id="OutboundPhase" totalCount="11" failCount="0"
<name id="OutboundPhase" totalCount="11" failCount="7"
failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="482.0" sum2="68088.0"
std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html
......
......@@ -7,8 +7,8 @@
<version>0.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-hadoop</artifactId>
<name>CAT Hadoop Analysis</name>
<artifactId>cat-job</artifactId>
<name>CAT Job Analysis</name>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
......@@ -18,6 +18,16 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.203.0</version>
<exclusions>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
......@@ -44,7 +54,7 @@
<goal>plexus</goal>
</goals>
<configuration>
<className>com.dianping.cat.hadoop.plexus.ComponentsConfigurator</className>
<className>com.dianping.cat.job.build.ComponentsConfigurator</className>
</configuration>
</execution>
</executions>
......@@ -59,7 +69,7 @@
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.dianping.cat.hadoop.job.BrowserAnalyzer</mainClass>
<mainClass>com.dianping.cat.job.job.BrowserAnalyzer</mainClass>
</manifest>
</archive>
</configuration>
......
package com.dianping.cat.hadoop;
package com.dianping.cat.job;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageStorage;
......
package com.dianping.cat.hadoop.plexus;
package com.dianping.cat.job.build;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.hadoop.HdfsDumpConsumer;
import com.dianping.cat.hadoop.hdfs.ChannelManager;
import com.dianping.cat.hadoop.hdfs.DefaultChannelManager;
import com.dianping.cat.hadoop.hdfs.DefaultOutputChannel;
import com.dianping.cat.hadoop.hdfs.HdfsMessageStorage;
import com.dianping.cat.hadoop.hdfs.OutputChannel;
import com.dianping.cat.job.HdfsDumpConsumer;
import com.dianping.cat.job.hdfs.OutputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannel;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.job.hdfs.OutputChannel;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
......@@ -25,20 +25,20 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
.config(E("maxSize").value(String.valueOf(2 * 1024 * 1024L))));
all.add(C(ChannelManager.class, DefaultChannelManager.class) //
all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class));
} else {
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
.config(E("maxSize").value(String.valueOf(128 * 1024 * 1024L))));
all.add(C(ChannelManager.class, DefaultChannelManager.class) //
all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class) //
.config(E("baseDir").value("data"), //
E("serverUri").value("/catlog")));
}
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
.req(ChannelManager.class));
.req(OutputChannelManager.class));
all.add(C(MessageConsumer.class, HdfsDumpConsumer.ID, HdfsDumpConsumer.class) //
.req(MessageStorage.class, "hdfs"));
......
package com.dianping.cat.hadoop.hdfs;
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
......@@ -51,8 +51,16 @@ public class DefaultOutputChannel implements OutputChannel {
return now - m_timestamp > m_ttl;
}
public void setMaxSize(int maxSize) {
m_maxSize = maxSize;
}
public void setTtl(long ttl) {
m_ttl = ttl;
}
@Override
public boolean out(MessageTree tree) throws IOException {
public boolean write(MessageTree tree) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
......@@ -72,12 +80,4 @@ public class DefaultOutputChannel implements OutputChannel {
return true;
}
public void setMaxSize(int maxSize) {
m_maxSize = maxSize;
}
public void setTtl(long ttl) {
m_ttl = ttl;
}
}
package com.dianping.cat.hadoop.hdfs;
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
......@@ -23,7 +23,7 @@ import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultChannelManager extends ContainerHolder implements ChannelManager, Initializable, LogEnabled {
public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, LogEnabled {
@Inject
private MessagePathBuilder m_builder;
......@@ -85,7 +85,36 @@ public class DefaultChannelManager extends ContainerHolder implements ChannelMan
}
@Override
public OutputChannel findChannel(MessageTree tree, boolean forceNew) throws IOException {
public void initialize() throws InitializationException {
try {
Configuration config = new Configuration();
FileSystem fs;
config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) {
fs = FileSystem.getLocal(config);
} else {
fs = FileSystem.get(m_serverUri, config); // TODO Not tested yet
}
m_fs = fs;
m_basePath = new Path(m_fs.getWorkingDirectory(), m_baseDir);
} catch (Exception e) {
throw new InitializationException("Error when getting HDFS file system.", e);
}
try {
InetAddress localHost = InetAddress.getLocalHost();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
m_logger.warn("Unable to get local host!", e);
}
}
@Override
public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException {
String path = m_builder.getHdfsPath(tree, m_ipAddress);
OutputChannel channel = m_channels.get(path);
......@@ -115,35 +144,6 @@ public class DefaultChannelManager extends ContainerHolder implements ChannelMan
return channel;
}
@Override
public void initialize() throws InitializationException {
try {
Configuration config = new Configuration();
FileSystem fs;
config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) {
fs = FileSystem.getLocal(config);
} else {
fs = FileSystem.get(m_serverUri, config); // TODO Not tested yet
}
m_fs = fs;
m_basePath = new Path(m_fs.getWorkingDirectory(), m_baseDir);
} catch (Exception e) {
throw new InitializationException("Error when getting HDFS file system.", e);
}
try {
InetAddress localHost = InetAddress.getLocalHost();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
m_logger.warn("Unable to get local host!", e);
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
......
package com.dianping.cat.hadoop.hdfs;
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
......@@ -17,7 +17,7 @@ import com.site.lookup.annotation.Inject;
public class HdfsMessageStorage implements MessageStorage, Initializable, Disposable, LogEnabled {
@Inject
private ChannelManager m_manager;
private OutputChannelManager m_manager;
private WriteJob m_job;
......@@ -76,14 +76,14 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos
private void handle(MessageTree tree) {
try {
OutputChannel channel = m_manager.findChannel(tree, false);
boolean success = channel.out(tree);
OutputChannel channel = m_manager.openChannel(tree, false);
boolean success = channel.write(tree);
if (!success) {
m_manager.closeChannel(channel);
channel = m_manager.findChannel(tree, true);
channel.out(tree);
channel = m_manager.openChannel(tree, true);
channel.write(tree);
}
} catch (IOException e) {
m_logger.error("Error when writing to HDFS!", e);
......
package com.dianping.cat.hadoop.hdfs;
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
......@@ -7,13 +7,16 @@ import com.dianping.cat.message.spi.MessageTree;
public interface OutputChannel {
/**
* Output the message tree to the HDFS.
* Close the channel.
*/
public void close();
/**
* Initialize the channel with an output stream.
*
* @param tree
* @return false if the max size is reached, false otherwise.
* @throws IOException
* @param out
*/
public boolean out(MessageTree tree) throws IOException;
public void initialize(OutputStream out);
/**
* Check if the channel is expired.
......@@ -23,14 +26,11 @@ public interface OutputChannel {
public boolean isExpired();
/**
* Close the channel.
*/
public void close();
/**
* Initialize the channel with an output stream.
* Output the message tree to the HDFS.
*
* @param out
* @param tree
* @return false if the max size is reached, false otherwise.
* @throws IOException
*/
public void initialize(OutputStream out);
public boolean write(MessageTree tree) throws IOException;
}
package com.dianping.cat.hadoop.hdfs;
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import com.dianping.cat.message.spi.MessageTree;
public interface ChannelManager {
public OutputChannel findChannel(MessageTree tree, boolean forceNew) throws IOException;
public interface OutputChannelManager {
public void cleanupChannels();
public void closeAllChannels();
public void closeChannel(OutputChannel channel);
public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException;
}
package com.dianping.cat.hadoop.job;
package com.dianping.cat.job.job;
import java.io.File;
import java.io.IOException;
......@@ -16,8 +16,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.dianping.cat.hadoop.mapreduce.MessageTreeInputFormat;
import com.dianping.cat.hadoop.mapreduce.MessageTreeWritable;
import com.dianping.cat.job.mapreduce.MessageTreeInputFormat;
import com.dianping.cat.job.mapreduce.MessageTreeWritable;
import com.dianping.cat.message.Message;
import com.site.helper.Files;
......
package com.dianping.cat.hadoop.mapreduce;
package com.dianping.cat.job.mapreduce;
import java.io.BufferedInputStream;
import java.io.IOException;
......
package com.dianping.cat.hadoop.mapreduce;
package com.dianping.cat.job.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
......
<plexus>
<components>
<component>
<role>com.dianping.cat.hadoop.hdfs.OutputChannel</role>
<implementation>com.dianping.cat.hadoop.hdfs.DefaultOutputChannel</implementation>
<role>com.dianping.cat.job.hdfs.OutputChannel</role>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannel</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<maxSize>2097152</maxSize>
......@@ -15,8 +15,8 @@
</requirements>
</component>
<component>
<role>com.dianping.cat.hadoop.hdfs.ChannelManager</role>
<implementation>com.dianping.cat.hadoop.hdfs.DefaultChannelManager</implementation>
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
......@@ -26,17 +26,17 @@
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
<implementation>com.dianping.cat.hadoop.hdfs.HdfsMessageStorage</implementation>
<implementation>com.dianping.cat.job.hdfs.HdfsMessageStorage</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.ChannelManager</role>
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dump-to-hdfs</role-hint>
<implementation>com.dianping.cat.hadoop.HdfsDumpConsumer</implementation>
<implementation>com.dianping.cat.job.HdfsDumpConsumer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStorage</role>
......
package com.dianping.cat.hadoop.hdfs;
package com.dianping.cat.job.hdfs;
import org.junit.After;
import org.junit.Before;
......@@ -10,7 +10,7 @@ public abstract class CatTestCase extends ComponentTestCase {
@Before
public void before() throws Exception {
Cat.initialize(getContainer(), null);
Cat.setup(null, null);
Cat.setup(null);
}
@After
......
package com.dianping.cat.hadoop.hdfs;
package com.dianping.cat.job.hdfs;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.hadoop.hdfs.HdfsMessageStorage;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.1.0</version>
<name>Tracking</name>
<packaging>pom</packaging>
<modules>
<module>cat-core</module>
<module>cat-consumer</module>
<module>cat-home</module>
<module>cat-hadoop</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-consumer</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-container-default</artifactId>
<version>1.0-alpha-47</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<version>1.0.0-a1</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>web-framework</artifactId>
<version>1.0.0-a3</version>
</dependency>
<dependency>
<groupId>org.unidal.webres</groupId>
<artifactId>WebResServer</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.site.dal</groupId>
<artifactId>dal-jdbc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.site.app</groupId>
<artifactId>app-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.12</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.7.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.9</version>
<configuration>
<includes>
<include>**/AllTests.java</include>
</includes>
<!--
<debugForkedProcess>-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 -Xnoagent -Djava.compiler=NONE</debugForkedProcess>
-->
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>2.9</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<additionalConfig>
<file>
<name>.settings/org.eclipse.jdt.core.prefs</name>
<content><![CDATA[
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.1.0</version>
<name>Tracking</name>
<packaging>pom</packaging>
<modules>
<module>cat-core</module>
<module>cat-consumer</module>
<module>cat-home</module>
<module>cat-job</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-consumer</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-job</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-container-default</artifactId>
<version>1.0-alpha-47</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<version>1.0.0-a1</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>web-framework</artifactId>
<version>1.0.0-a3</version>
</dependency>
<dependency>
<groupId>org.unidal.webres</groupId>
<artifactId>WebResServer</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.site.dal</groupId>
<artifactId>dal-jdbc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.site.app</groupId>
<artifactId>app-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.12</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.7.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.9</version>
<configuration>
<includes>
<include>**/AllTests.java</include>
</includes>
<!-- <debugForkedProcess>-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 -Xnoagent -Djava.compiler=NONE</debugForkedProcess> -->
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>2.9</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<additionalConfig>
<file>
<name>.settings/org.eclipse.jdt.core.prefs</name>
<content><![CDATA[
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.source=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
]]></content>
</file>
<file>
<name>.settings/org.eclipse.core.resources.prefs</name>
<content>
</file>
<file>
<name>.settings/org.eclipse.core.resources.prefs</name>
<content>
<![CDATA[
eclipse.preferences.version=1
encoding/<project>=UTF-8
]]>
</content>
</file>
</additionalConfig>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>releases</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<properties>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
</properties>
</content>
</file>
</additionalConfig>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>releases</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<properties>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
</properties>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册