提交 5a0e6a86 编写于 作者: Y youyong

modify the cat home

......@@ -17,10 +17,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
......@@ -34,7 +31,7 @@
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.6</version>
<version>1.0.8</version>
<executions>
<execution>
<id>generate failure report model</id>
......@@ -57,8 +54,7 @@
</configuration>
</execution>
<execution>
<!-- <id>generate IP report model</id> -->
<id>default-cli</id>
<id>generate IP report model</id>
<phase>generate-sources</phase>
<goals>
<goal>dal-model</goal>
......@@ -67,6 +63,36 @@
<manifest>${basedir}/src/main/resources/META-INF/dal/model/ip-manifest.xml</manifest>
</configuration>
</execution>
<execution>
<id>generate plexus component descriptor</id>
<phase>process-classes</phase>
<goals>
<goal>plexus</goal>
</goals>
<configuration>
<className>com.dianping.cat.consumer.configuration.ComponentsConfigurator</className>
<env>dev</env>
<profile>
<env id="dev">
<property name="n1">v10</property>
<property name="n2">v20</property>
<property name="extraTime">12345</property>
</env>
<env id="alpha">
<property name="n1">v11</property>
<property name="n2">v21</property>
</env>
<env id="beta">
<property name="n1">v12</property>
<property name="n2">v22</property>
</env>
<env id="production">
<property name="n1">v13</property>
<property name="n2">v23</property>
</env>
</profile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
......
......@@ -31,18 +31,18 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class).config(E("consumerId").value("realtime") //
, E("extraTime").value("300000")//
, E("extraTime").value(property("extraTime", "300000"))//
, E("analyzerNames").value("failure,transaction,ip")));
String failureTypes = "Error,RuntimeException,Exception";
all.add(C(Handler.class, "failure-handler", FailureHandler.class)//
.config(E("failureType").value(failureTypes))//
.req(MessageStorage.class,"html"));
.req(MessageStorage.class, "html"));
all.add(C(Handler.class, "long-url-handler", LongUrlHandler.class) //
.config(E("threshold").value("2000"))//
.req(MessageStorage.class,"html"));
.req(MessageStorage.class, "html"));
all.add(C(FailureReportAnalyzer.class).is(PER_LOOKUP) //
.config(E("reportPath").value("target/report/failure/")) //
......@@ -51,9 +51,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(TransactionReportAnalyzer.class).is(PER_LOOKUP) //
.req(MessageManager.class) //
.config(E("reportPath").value("target/report/transaction/"))
.req(MessageStorage.class, "html"));
.config(E("reportPath").value("target/report/transaction/")).req(MessageStorage.class, "html"));
all.add(C(IpAnalyzer.class).is(PER_LOOKUP));
return all;
......
......@@ -4,13 +4,13 @@ import org.junit.Assert;
import org.junit.Test;
import com.dianping.cat.consumer.ip.model.entity.IpReport;
import com.dianping.cat.consumer.ip.model.transform.DefaultParser;
import com.dianping.cat.consumer.ip.model.transform.DefaultXmlParser;
import com.site.helper.Files;
public class IpAnalyzerTest {
@Test
public void testXml() throws Exception {
DefaultParser parser = new DefaultParser();
DefaultXmlParser parser = new DefaultXmlParser();
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("ip.xml"), "utf-8");
IpReport report = parser.parse(expected);
......
......@@ -10,10 +10,6 @@
<artifactId>cat-core</artifactId>
<name>CAT Core</name>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
......@@ -22,6 +18,11 @@
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
......
......@@ -31,9 +31,15 @@ public class Cat {
private MessageManager m_manager;
private PlexusContainer m_container;
private Cat() {
}
public static void destroy() {
s_instance = new Cat();
}
static Cat getInstance() {
if (!s_instance.m_initialized) {
try {
......@@ -100,6 +106,7 @@ public class Cat {
config.accept(validator);
getInstance().m_manager.initializeClient(config);
} else {
getInstance().m_manager.initializeClient(null);
System.out.println("[WARN] Cat client is disabled due to no config file found!");
}
}
......@@ -108,6 +115,15 @@ public class Cat {
return s_instance.m_initialized;
}
public static <T> T lookup(Class<T> role) throws ComponentLookupException {
return lookup(role, null);
}
@SuppressWarnings("unchecked")
public static <T> T lookup(Class<T> role, String hint) throws ComponentLookupException {
return (T) getInstance().m_container.lookup(role, hint);
}
// this should be called when a thread ends to clean some thread local data
public static void reset() {
getInstance().m_manager.reset();
......@@ -120,6 +136,8 @@ public class Cat {
}
void setContainer(PlexusContainer container) {
m_container = container;
try {
m_manager = (MessageManager) container.lookup(MessageManager.class);
} catch (ComponentLookupException e) {
......
......@@ -64,12 +64,12 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageCodec.class, "html", HtmlMessageCodec.class) //
.req(BufferWriter.class, "html-encode"));
all.add(C(MessageConsumer.class, "dummy", DummyConsumer.class));
all.add(C(MessageConsumer.class, "dump-to-html", DumpToHtmlConsumer.class) //
all.add(C(MessageConsumer.class, DummyConsumer.ID, DummyConsumer.class));
all.add(C(MessageConsumer.class, DumpToHtmlConsumer.ID, DumpToHtmlConsumer.class) //
.req(MessageStorage.class, "html") //
.req(MessagePathBuilder.class));
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "dummy" }, "m_consumers"));
.req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers"));
all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) //
.is(PER_LOOKUP) //
......
......@@ -114,7 +114,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
m_hostName = localHost.getHostName();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
m_logger.warn("Unable to get local host!", e);
}
m_manager = lookup(TransportManager.class);
......@@ -142,7 +142,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
public void start(Transaction transaction) {
if (Cat.isInitialized()) {
getContext().start(transaction);
} else if (m_firstMessage){
} else if (m_firstMessage) {
m_firstMessage = false;
m_logger.warn("CAT client is not enabled because it's not initialized yet");
}
......@@ -168,14 +168,15 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
m_tree.setHostName(hostName);
m_tree.setIpAddress(ipAddress);
m_tree.setMessageId(UUID.randomUUID().toString()); // TODO optimize it
// to shorter UUID
}
public void add(DefaultMessageManager manager, Message message) {
if (m_stack.isEmpty()) {
m_tree.setMessage(message);
manager.flush(m_tree);
MessageTree tree = m_tree.copy();
tree.setMessage(message);
tree.setMessageId(UUID.randomUUID().toString());
manager.flush(tree);
} else {
Transaction entry = m_stack.peek();
......@@ -196,7 +197,11 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
m_stack.pop();
if (m_stack.isEmpty()) {
manager.flush(m_tree);
MessageTree tree = m_tree.copy();
m_tree.setMessage(null);
tree.setMessageId(UUID.randomUUID().toString());
manager.flush(tree);
}
}
}
......
......@@ -4,6 +4,8 @@ import java.io.File;
import java.net.URL;
public interface MessagePathBuilder {
public String getHdfsPath(MessageTree tree, String host);
public File getLogViewBaseDir();
public URL getLogViewBaseUrl();
......
......@@ -2,7 +2,9 @@ package com.dianping.cat.message.spi;
import com.dianping.cat.message.Message;
public interface MessageTree {
public interface MessageTree extends Cloneable {
public MessageTree copy();
public String getDomain();
public String getHostName();
......
......@@ -4,9 +4,11 @@ import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageTree;
public class DummyConsumer implements MessageConsumer {
public static final String ID = "dummy";
@Override
public String getConsumerId() {
return "dummy";
return ID;
}
@Override
......
......@@ -14,17 +14,24 @@ import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class DumpToHtmlConsumer implements MessageConsumer, Initializable, LogEnabled {
@Inject
public static final String ID = "dump-to-html";
@Inject(value = "html")
private MessageStorage m_storage;
@Inject
private MessagePathBuilder m_builder;
@Inject
private String m_domain;
private Logger m_logger;
@Override
public void consume(MessageTree tree) {
m_storage.store(tree);
if (m_domain == null || m_domain.equals(tree.getDomain())) {
m_storage.store(tree);
}
}
@Override
......@@ -34,13 +41,12 @@ public class DumpToHtmlConsumer implements MessageConsumer, Initializable, LogEn
@Override
public String getConsumerId() {
return "dump-to-file";
return ID;
}
@Override
public String getDomain() {
// no limitation
return null;
return m_domain;
}
@Override
......@@ -48,6 +54,10 @@ public class DumpToHtmlConsumer implements MessageConsumer, Initializable, LogEn
File baseDir = m_builder.getLogViewBaseDir();
baseDir.mkdirs();
m_logger.info(String.format("Message will be dumpped to %s in HTML.", baseDir));
m_logger.info(String.format("Message will be dumpped to %s.", baseDir));
}
public void setDomain(String domain) {
m_domain = domain;
}
}
......@@ -24,6 +24,15 @@ public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializa
private URL m_baseLogUrl;
@Override
public String getHdfsPath(MessageTree tree, String host) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/{1}/{0,date,mm}-{2}");
Date date = new Date(tree.getMessage().getTimestamp());
String path = format.format(new Object[] { date, tree.getDomain(), host });
return path;
}
@Override
public File getLogViewBaseDir() {
return m_baseLogDir;
......
......@@ -65,7 +65,7 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis
public void append(MessageTree tree) {
try {
m_queue.put(tree);
m_queue.offer(tree);
} catch (Exception e) {
m_logger.warn("Error when adding job to queue.", e);
}
......@@ -106,8 +106,7 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis
public void run() {
try {
while (m_active) {
MessageTree tree = m_queue.poll(1000 * 1000L, TimeUnit.NANOSECONDS); // 1
// ms
MessageTree tree = m_queue.poll(1000 * 1000L, TimeUnit.NANOSECONDS);
if (tree != null) {
handle(tree);
......@@ -125,12 +124,12 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis
}
}
} catch (Exception e) {
m_logger.warn("Error when processing job in queue.", e);
m_logger.warn("Error when writing message to local file system.", e);
}
}
public void shutdown() {
m_active = true;
m_active = false;
}
}
}
......@@ -28,6 +28,23 @@ public class DefaultMessageTree implements MessageTree {
private Message m_message;
@Override
public DefaultMessageTree copy() {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setDomain(m_domain);
tree.setHostName(m_hostName);
tree.setIpAddress(m_ipAddress);
tree.setMessageId(m_messageId);
tree.setRequestToken(m_requestToken);
tree.setSessionToken(m_sessionToken);
tree.setThreadId(m_threadId);
tree.setThreadName(m_threadName);
tree.setMessage(m_message);
return tree;
}
@Override
public String getDomain() {
return m_domain;
......
package com.dianping.cat.message;
import org.junit.After;
import org.junit.Before;
import com.dianping.cat.Cat;
import com.site.lookup.ComponentTestCase;
public abstract class CatTestCase extends ComponentTestCase {
@Before
public void before() throws Exception {
Cat.initialize(getContainer(), null);
Cat.setup(null, null);
}
@After
public void after() throws Exception {
Cat.reset();
Cat.destroy();
}
}
\ No newline at end of file
package com.dianping.cat.message;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.Cat;
public class EventTest {
public static MessageProducer CAT = Cat.getProducer();
@Before
public void before() {
Cat.setup(null, null);
}
@After
public void after() {
Cat.reset();
}
@RunWith(JUnit4.class)
public class EventTest extends CatTestCase {
@Test
public void testNormal() {
Event event = CAT.newEvent("Review", "New");
Event event = Cat.getProducer().newEvent("Review", "New");
event.addData("id", 12345);
event.addData("user", "john");
......@@ -31,15 +20,15 @@ public class EventTest {
@Test
public void testException() {
CAT.logError(new RuntimeException());
Cat.getProducer().logError(new RuntimeException());
}
@Test
public void testInOneShot() {
// Normal case
CAT.logEvent("Review", "New", Message.SUCCESS, "id=12345&user=john");
Cat.getProducer().logEvent("Review", "New", Message.SUCCESS, "id=12345&user=john");
// Exception case
CAT.logError(new RuntimeException());
Cat.getProducer().logError(new RuntimeException());
}
}
package com.dianping.cat.message;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.Cat;
public class HeartbeatTest {
private static final MessageProducer CAT = Cat.getProducer();
@Before
public void before() {
Cat.setup(null, null);
}
@After
public void after() {
Cat.reset();
}
@RunWith(JUnit4.class)
public class HeartbeatTest extends CatTestCase {
@Test
public void testInOneShot() {
CAT.logHeartbeat("System", "Status", "0",
Cat.getProducer().logHeartbeat("System", "Status", "0",
"ip=192.168.10.111&host=host-1&load=2.1&cpu=0.12,0.10&memory.total=2G&memory.free=456M");
}
@Test
public void testService() {
Heartbeat heartbeat = CAT.newHeartbeat("Service", "ReviewService");
Heartbeat heartbeat = Cat.getProducer().newHeartbeat("Service", "ReviewService");
heartbeat.addData("host", "192.168.10.112:1234");
heartbeat.addData("weight", "20");
......@@ -40,7 +29,7 @@ public class HeartbeatTest {
@Test
public void testStatus() {
Heartbeat heartbeat = CAT.newHeartbeat("System", "Status");
Heartbeat heartbeat = Cat.getProducer().newHeartbeat("System", "Status");
heartbeat.addData("ip", "192.168.10.111");
heartbeat.addData("host", "host-1");
......
package com.dianping.cat.message;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.Cat;
public class TransactionTest {
private static final MessageProducer CAT = Cat.getProducer();
@Before
public void before() {
Cat.setup(null, null);
}
@After
public void after() {
Cat.reset();
}
@RunWith(JUnit4.class)
public class TransactionTest extends CatTestCase {
@Test
public void testNormal() {
Transaction t = CAT.newTransaction("URL", "MyPage");
Transaction t = Cat.getProducer().newTransaction("URL", "MyPage");
try {
// do your business here
......
......@@ -2,37 +2,19 @@ package com.dianping.cat.message.internal;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.message.CatTestCase;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class MessageProducerTest extends ComponentTestCase {
@Before
public void before() throws Exception {
MessageManager manager = lookup(MessageManager.class);
manager.initializeClient(null);
manager.setup(null, null);
}
@After
public void after() throws Exception {
MessageManager manager = lookup(MessageManager.class);
manager.reset();
}
public class MessageProducerTest extends CatTestCase {
@Test
public void testNormal() throws Exception {
MessageProducer producer = lookup(MessageProducer.class);
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-hadoop</artifactId>
<name>CAT Hadoop Analysis</name>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.203.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.7</version>
<executions>
<execution>
<id>generate plexus component descriptor</id>
<phase>process-classes</phase>
<goals>
<goal>plexus</goal>
</goals>
<configuration>
<className>com.dianping.cat.hadoop.plexus.ComponentsConfigurator</className>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.dianping.cat.hadoop;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class HdfsDumpConsumer implements MessageConsumer {
public static final String ID = "dump-to-hdfs";
@Inject
private MessageStorage m_storage;
@Inject
private String m_domain;
@Override
public void consume(MessageTree tree) {
if (m_domain == null || m_domain.equals(tree.getDomain())) {
m_storage.store(tree);
}
}
@Override
public String getConsumerId() {
return ID;
}
@Override
public String getDomain() {
return m_domain;
}
public void setDomain(String domain) {
m_domain = domain;
}
}
package com.dianping.cat.hadoop.hdfs;
import java.io.IOException;
import com.dianping.cat.message.spi.MessageTree;
public interface ChannelManager {
public OutputChannel findChannel(MessageTree tree, boolean forceNew) throws IOException;
public void cleanupChannels();
public void closeAllChannels();
public void closeChannel(OutputChannel channel);
}
package com.dianping.cat.hadoop.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultChannelManager extends ContainerHolder implements ChannelManager, Initializable, LogEnabled {
@Inject
private MessagePathBuilder m_builder;
@Inject
private String m_baseDir = "target/hdfs";
@Inject
private URI m_serverUri;
private FileSystem m_fs;
private Path m_basePath;
private String m_ipAddress;
private Map<String, OutputChannel> m_channels = new HashMap<String, OutputChannel>();
private Map<String, Integer> m_indexes = new HashMap<String, Integer>();
private Logger m_logger;
@Override
public void cleanupChannels() {
try {
List<String> expired = new ArrayList<String>();
for (Map.Entry<String, OutputChannel> e : m_channels.entrySet()) {
if (e.getValue().isExpired()) {
expired.add(e.getKey());
}
}
for (String path : expired) {
OutputChannel channel = m_channels.remove(path);
closeChannel(channel);
}
} catch (Exception e) {
m_logger.warn("Error when doing cleanup!", e);
}
}
@Override
public void closeAllChannels() {
for (OutputChannel channel : m_channels.values()) {
closeChannel(channel);
}
}
@Override
public void closeChannel(OutputChannel channel) {
channel.close();
super.release(channel);
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public OutputChannel findChannel(MessageTree tree, boolean forceNew) throws IOException {
String path = m_builder.getHdfsPath(tree, m_ipAddress);
OutputChannel channel = m_channels.get(path);
if (channel == null) {
Path file = new Path(m_basePath, path + "-0");
OutputStream out = m_fs.create(file);
channel = lookup(OutputChannel.class);
channel.initialize(out);
m_indexes.put(path, 0);
m_channels.put(path, channel);
} else if (forceNew) {
int index = m_indexes.get(path);
closeChannel(channel);
m_indexes.put(path, ++index);
Path file = new Path(m_basePath, path + "-" + index);
OutputStream out = m_fs.create(file);
channel = lookup(OutputChannel.class);
channel.initialize(out);
m_channels.put(path, channel);
}
return channel;
}
@Override
public void initialize() throws InitializationException {
try {
Configuration config = new Configuration();
FileSystem fs;
config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) {
fs = FileSystem.getLocal(config);
} else {
fs = FileSystem.get(m_serverUri, config); // TODO Not tested yet
}
m_fs = fs;
m_basePath = new Path(m_fs.getWorkingDirectory(), m_baseDir);
} catch (Exception e) {
throw new InitializationException("Error when getting HDFS file system.", e);
}
try {
InetAddress localHost = InetAddress.getLocalHost();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
m_logger.warn("Unable to get local host!", e);
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
public void setServerUri(String serverUri) {
m_serverUri = URI.create(serverUri);
}
}
package com.dianping.cat.hadoop.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class DefaultOutputChannel implements OutputChannel {
@Inject
private MessageCodec m_codec;
@Inject
private int m_maxSize = 0; // 0 means unlimited
@Inject
private long m_ttl = 90 * 1000L; // 90 seconds
private OutputStream m_out;
private int m_count;
private long m_timestamp;
@Override
public void close() {
if (m_out != null) {
try {
m_out.close();
m_out = null;
} catch (IOException e) {
// ignore it
}
}
}
@Override
public void initialize(OutputStream out) {
m_out = out;
m_timestamp = System.currentTimeMillis();
}
@Override
public boolean isExpired() {
long now = System.currentTimeMillis();
return now - m_timestamp > m_ttl;
}
@Override
public boolean out(MessageTree tree) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
int length = buf.readInt();
if (m_maxSize > 0 && m_count + length + 1 > m_maxSize) {
// exceed the max size
return false;
}
buf.getBytes(buf.readerIndex(), m_out, length);
// a blank line used to separate two message trees
m_out.write('\n');
m_count += length + 1;
return true;
}
public void setMaxSize(int maxSize) {
m_maxSize = maxSize;
}
public void setTtl(long ttl) {
m_ttl = ttl;
}
}
package com.dianping.cat.hadoop.hdfs;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class HdfsMessageStorage implements MessageStorage, Initializable, Disposable, LogEnabled {
@Inject
private ChannelManager m_manager;
private WriteJob m_job;
private Thread m_thread;
private Logger m_logger;
@Override
public void dispose() {
m_job.shutdown();
try {
m_thread.join();
} catch (InterruptedException e) {
// ignore it
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
m_job = new WriteJob();
Thread thread = new Thread(m_job);
thread.setName("Storage write Job");
thread.start();
m_thread = thread;
}
@Override
public String store(MessageTree tree) {
m_job.append(tree);
// Not available
return null;
}
class WriteJob implements Runnable {
private BlockingQueue<MessageTree> m_queue = new LinkedBlockingQueue<MessageTree>();
private volatile boolean m_active = true;
public void append(MessageTree tree) {
try {
m_queue.offer(tree);
} catch (Exception e) {
m_logger.warn("Error when adding job to queue.", e);
}
}
private void handle(MessageTree tree) {
try {
OutputChannel channel = m_manager.findChannel(tree, false);
boolean success = channel.out(tree);
if (!success) {
m_manager.closeChannel(channel);
channel = m_manager.findChannel(tree, true);
channel.out(tree);
}
} catch (IOException e) {
m_logger.error("Error when writing to HDFS!", e);
}
}
private boolean isActive() {
synchronized (this) {
return m_active;
}
}
@Override
public void run() {
long lastCheckedTime = System.currentTimeMillis();
try {
while (isActive()) {
MessageTree tree = m_queue.poll(1000 * 1000L, TimeUnit.NANOSECONDS);
if (tree != null) {
handle(tree);
}
// check connection timeout and close it
if (System.currentTimeMillis() - lastCheckedTime >= 5 * 1000) {
lastCheckedTime = System.currentTimeMillis();
m_manager.cleanupChannels();
}
}
// process the remaining job in the queue
while (!isActive()) {
MessageTree tree = m_queue.poll();
if (tree != null) {
handle(tree);
} else {
break;
}
}
} catch (Exception e) {
m_logger.warn("Error when dumping message to HDFS.", e);
}
m_manager.closeAllChannels();
}
public void shutdown() {
synchronized (this) {
m_active = false;
}
}
}
}
package com.dianping.cat.hadoop.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import com.dianping.cat.message.spi.MessageTree;
public interface OutputChannel {
/**
* Output the message tree to the HDFS.
*
* @param tree
* @return false if the max size is reached, false otherwise.
* @throws IOException
*/
public boolean out(MessageTree tree) throws IOException;
/**
* Check if the channel is expired.
*
* @return true if the channel is expired, false otherwise.
*/
public boolean isExpired();
/**
* Close the channel.
*/
public void close();
/**
* Initialize the channel with an output stream.
*
* @param out
*/
public void initialize(OutputStream out);
}
package com.dianping.cat.hadoop.job;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class BrowserAnalysisJob {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text m_word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
m_word.set(itr.nextToken());
context.write(m_word, ONE);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(BrowserAnalysisJob.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
package com.dianping.cat.hadoop.plexus;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.hadoop.HdfsDumpConsumer;
import com.dianping.cat.hadoop.hdfs.ChannelManager;
import com.dianping.cat.hadoop.hdfs.DefaultChannelManager;
import com.dianping.cat.hadoop.hdfs.DefaultOutputChannel;
import com.dianping.cat.hadoop.hdfs.HdfsMessageStorage;
import com.dianping.cat.hadoop.hdfs.OutputChannel;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
if (isEnv("dev") || property("env", null) == null) {
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
.config(E("maxSize").value(String.valueOf(5 * 1024L))));
all.add(C(ChannelManager.class, DefaultChannelManager.class) //
.req(MessagePathBuilder.class));
} else {
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
.config(E("maxSize").value(String.valueOf(128 * 1024 * 1024L))));
all.add(C(ChannelManager.class, DefaultChannelManager.class) //
.req(MessagePathBuilder.class) //
.config(E("baseDir").value("data"), //
E("serverUri").value("/catlog")));
}
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
.req(ChannelManager.class));
all.add(C(MessageConsumer.class, HdfsDumpConsumer.ID, HdfsDumpConsumer.class) //
.req(MessageStorage.class, "hdfs"));
return all;
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
}
<plexus>
<components>
<component>
<role>com.dianping.cat.hadoop.hdfs.OutputChannel</role>
<implementation>com.dianping.cat.hadoop.hdfs.DefaultOutputChannel</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<maxSize>5120</maxSize>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.hadoop.hdfs.ChannelManager</role>
<implementation>com.dianping.cat.hadoop.hdfs.DefaultChannelManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
<implementation>com.dianping.cat.hadoop.hdfs.HdfsMessageStorage</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.ChannelManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dump-to-hdfs</role-hint>
<implementation>com.dianping.cat.hadoop.HdfsDumpConsumer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
package com.dianping.cat.hadoop.hdfs;
import org.junit.After;
import org.junit.Before;
import com.dianping.cat.Cat;
import com.site.lookup.ComponentTestCase;
public abstract class CatTestCase extends ComponentTestCase {
@Before
public void before() throws Exception {
Cat.initialize(getContainer(), null);
Cat.setup(null, null);
}
@After
public void after() throws Exception {
Cat.reset();
Cat.destroy();
}
}
\ No newline at end of file
package com.dianping.cat.hadoop.hdfs;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.hadoop.hdfs.HdfsMessageStorage;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
@RunWith(JUnit4.class)
public class HdfsMessageStorageTest extends CatTestCase {
@Test
public void test() throws Exception {
MessageStorage storage = lookup(MessageStorage.class, "hdfs");
MessageProducer producer = lookup(MessageProducer.class);
InMemoryQueue queue = lookup(InMemoryQueue.class);
for (int i = 0; i < 100; i++) {
Transaction t = producer.newTransaction("URL", "MyPage");
try {
// do your business here
t.addData("k1", "v1");
t.addData("k2", "v2");
t.addData("k3", "v3");
Thread.sleep(10);
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
} finally {
t.complete();
}
MessageTree tree = queue.poll(0);
storage.store(tree);
}
((HdfsMessageStorage) storage).dispose();
}
}
......@@ -15,13 +15,10 @@
<groupId>com.dianping.cat</groupId>
<artifactId>cat-consumer</artifactId>
</dependency>
<dependency>
<groupId>com.site.app</groupId>
<artifactId>app-core</artifactId>
</dependency>
<dependency>
<groupId>com.site.dal</groupId>
<artifactId>dal-jdbc</artifactId>
<artifactId>dal-xml</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
......@@ -41,11 +38,6 @@
<artifactId>jsp-2.1</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
......@@ -83,11 +75,4 @@
</plugin>
</plugins>
</build>
<properties>
<jdbc.driver>com.mysql.jdbc.Driver</jdbc.driver>
<jdbc.url>jdbc:mysql://tech-wuqim.dianpingoa.com:3306/garden</jdbc.url>
<jdbc.user>root</jdbc.user>
<jdbc.password>Passw0rd</jdbc.password>
<jdbc.connectionProperties><![CDATA[useUnicode=true&autoReconnect=true]]></jdbc.connectionProperties>
</properties>
</project>
......@@ -21,9 +21,14 @@ public class ComponentsConfigurator extends AbstractWebComponentsConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime", "dump-to-html" }, "m_consumers"));
if (isEnv("dev")) {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime", "dump-to-html" }, "m_consumers"));
} else {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime" }, "m_consumers"));
}
all.add(C(ServerConfig.class)//
.config(E("consumerServers").value("192.168.32.68:2281,192.168.32.68:2281"))//
.config(E("fileServer").value("192.168.32.68")));
......
......@@ -9,12 +9,12 @@ import com.dianping.cat.consumer.failure.model.entity.Entry;
import com.dianping.cat.consumer.failure.model.entity.FailureReport;
import com.dianping.cat.consumer.failure.model.entity.Segment;
import com.dianping.cat.consumer.failure.model.entity.Threads;
import com.dianping.cat.consumer.failure.model.transform.DefaultParser;
import com.dianping.cat.consumer.failure.model.transform.DefaultXmlParser;
public class FailureReportTool {
public static FailureReport parseXML(String xml) {
DefaultParser parser = new DefaultParser();
DefaultXmlParser parser = new DefaultXmlParser();
try {
return parser.parse(xml);
} catch (Exception e) {
......
......@@ -54,7 +54,7 @@ public class Handler implements PageHandler<Context> {
model.setAction(Action.VIEW);
model.setPage(ReportPage.FAILURE);
String domain = payload.getDomain();
String ip = payload.getIp();
......
......@@ -3,12 +3,12 @@ package com.dianping.cat.report.page.ip;
import java.util.List;
import com.dianping.cat.consumer.ip.model.entity.IpReport;
import com.dianping.cat.consumer.ip.model.transform.DefaultParser;
import com.dianping.cat.consumer.ip.model.transform.DefaultXmlParser;
public class IpReportTool {
public static IpReport parseXML(String xml) {
DefaultParser parser = new DefaultParser();
DefaultXmlParser parser = new DefaultXmlParser();
try {
return parser.parse(xml);
} catch (Exception e) {
......
......@@ -3,11 +3,11 @@ package com.dianping.cat.report.page.transaction;
import java.util.List;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultParser;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
public class TransactionReportTool {
public static TransactionReport parseXML(String xml) {
DefaultParser parser = new DefaultParser();
DefaultXmlParser parser = new DefaultXmlParser();
try {
return parser.parse(xml);
} catch (Exception e) {
......
package com.dianping.garden.view;
package com.dianping.cat.report.view;
import java.io.IOException;
import java.io.StringReader;
......
package com.dianping.garden.view;
package com.dianping.cat.report.view;
import com.dianping.cat.report.ReportPage;
import com.site.web.mvc.Page;
......
package com.dianping.garden.view;
package com.dianping.cat.report.view;
import com.site.web.mvc.Action;
import com.site.web.mvc.ActionContext;
......
......@@ -8,7 +8,6 @@
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hints>
<role-hint>realtime</role-hint>
<role-hint>dump-to-html</role-hint>
</role-hints>
<field-name>m_consumers</field-name>
</requirement>
......
......@@ -8,7 +8,7 @@
<tag>
<name>errors</name>
<tag-class>com.dianping.garden.view.ErrorsTag</tag-class>
<tag-class>com.dianping.cat.report.view.ErrorsTag</tag-class>
<body-content>JSP</body-content>
</tag>
<tag-file>
......@@ -22,21 +22,21 @@
<function>
<description>Build form action for given id</description>
<name>action</name>
<function-class>com.dianping.garden.view.UriBuilder</function-class>
<function-class>com.dianping.cat.report.view.UriBuilder</function-class>
<function-signature>String action(com.site.web.mvc.ViewModel, java.lang.Object)</function-signature>
<example>${a:action(model, 123)}</example>
</function>
<function>
<description>Build uri for given id</description>
<name>uri</name>
<function-class>com.dianping.garden.view.UriBuilder</function-class>
<function-class>com.dianping.cat.report.view.UriBuilder</function-class>
<function-signature>String uri(com.site.web.mvc.ViewModel, java.lang.Object)</function-signature>
<example>${a:uri(model, 123)}</example>
</function>
<function>
<description>Build uri for given id with additional query string</description>
<name>uri2</name>
<function-class>com.dianping.garden.view.UriBuilder</function-class>
<function-class>com.dianping.cat.report.view.UriBuilder</function-class>
<function-signature>String uri2(com.site.web.mvc.ViewModel, java.lang.Object, java.lang.String)</function-signature>
<example>${a:uri2(model, 123, 'a=1&amp;b=2')}</example>
</function>
......
<%@ tag trimDirectiveWhitespaces="true" %>
<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core"%>
<%@ taglib prefix="res" uri="http://www.unidal.org/webres"%>
<jsp:useBean id="navBar" class="com.dianping.garden.view.NavigationBar" scope="page"/>
<jsp:useBean id="navBar" class="com.dianping.cat.report.view.NavigationBar" scope="page"/>
<res:bean id="res"/>
<html>
......
......@@ -12,6 +12,7 @@
<module>cat-core</module>
<module>cat-consumer</module>
<module>cat-home</module>
<module>cat-hadoop</module>
</modules>
<dependencyManagement>
<dependencies>
......@@ -28,7 +29,7 @@
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<version>1.0.0</version>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
......
......@@ -39,16 +39,6 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>dianping.releases</id>
<url>http://192.168.8.45:8080/artifactory/dianping-releases</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>releases</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/releases/</url>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册