提交 d3805c0c 编写于 作者: F Frankie Wu

implement more features

上级 b465f8ab
......@@ -8,7 +8,7 @@ import com.dianping.cat.message.spi.MessageAnalyzer;
*/
public interface AnalyzerFactory {
public MessageAnalyzer create(String name, long start, long duration, String domain, long extraTime);
public MessageAnalyzer create(String name, long start, long duration, long extraTime);
public void release(Object component);
......
package com.dianping.cat.consumer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.logview.LogViewPostHandler;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
......@@ -13,21 +14,26 @@ import com.site.lookup.ContainerHolder;
public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerFactory {
@Override
public MessageAnalyzer create(String name, long start, long duration, String domain, long extraTime) {
public MessageAnalyzer create(String name, long start, long duration, long extraTime) {
if (name.equals("problem")) {
ProblemAnalyzer analyzer = lookup(ProblemAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, domain, extraTime);
analyzer.setAnalyzerInfo(start, duration, extraTime);
return analyzer;
} else if (name.equals("transaction")) {
TransactionAnalyzer analyzer = lookup(TransactionAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, domain, extraTime);
analyzer.setAnalyzerInfo(start, duration, extraTime);
return analyzer;
} else if (name.equals("ip")) {
IpAnalyzer analyzer = lookup(IpAnalyzer.class);
return analyzer;
} else if (name.equals("logview")) {
LogViewPostHandler handler = lookup(LogViewPostHandler.class);
handler.initialize(start);
return handler;
}
throw new RuntimeException(String.format("No analyzer(%s) found!", name));
......
......@@ -5,16 +5,21 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.consumer.logview.LogViewPostHandler;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageQueue;
......@@ -24,8 +29,9 @@ import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
/**
* This is the real time consumer process framework. The config of the consumer
* contains name,domain,analyzers.
* This is the real time consumer process framework. All analyzers share the
* message decoding once, thereof reduce the overhead.
* <p>
*
* @author yong.you
* @since Jan 5, 2012
......@@ -35,8 +41,6 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private static final long MINUTE = 60 * 1000L;
private static final String DOMAIN_ALL = "all";
private static final int PROCESS_PERIOD = 3;
private static final long FIVE_MINUTES = 5 * 60 * 1000L;
......@@ -48,7 +52,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private String m_consumerId;
@Inject
private String m_domain = DOMAIN_ALL;
private List<String> m_eligibleDomains; // domains == null means not limit
@Inject
private long m_duration = 1 * HOUR;
......@@ -73,10 +77,13 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private Map<String, MessageAnalyzer> m_currentAnalyzers = new HashMap<String, MessageAnalyzer>();
private Set<String> m_domains = new HashSet<String>();
@Override
public void consume(MessageTree tree) {
if (!isInDomain(tree))
if (!isInDomain(tree)) {
return;
}
long timestamp = tree.getMessage().getTimestamp();
Period current = null;
......@@ -102,6 +109,8 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_logger.warn("The timestamp of message is out of range, IGNORED! \r\n" + tree);
}
}
m_domains.add(tree.getDomain());
}
private void distributeMessage(MessageTree tree, List<MessageQueue> queues) {
......@@ -141,11 +150,6 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
return m_currentAnalyzers.get(name);
}
@Override
public String getDomain() {
return m_domain;
}
public MessageAnalyzer getLastAnalyzer(String name) {
return m_lastAnalyzers.get(name);
}
......@@ -156,25 +160,25 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
}
private boolean isInDomain(MessageTree tree) {
if (m_domain == null || m_domain.length() == 0 || m_domain.equalsIgnoreCase(DOMAIN_ALL)) {
return true;
}
if (m_domain.indexOf(tree.getDomain()) > -1) {
if (m_eligibleDomains == null || m_eligibleDomains.isEmpty()) {
return true;
} else {
return m_eligibleDomains.contains(tree.getDomain());
}
return false;
}
public void setAnalyzerNames(String analyzerNames) {
m_analyzerNames = Splitters.by(',').noEmptyItem().trim().split(analyzerNames);
public void setAnalyzers(String analyzers) {
m_analyzerNames = Splitters.by(',').noEmptyItem().trim().split(analyzers);
}
public void setConsumerId(String consumerId) {
m_consumerId = consumerId;
}
public void setDomain(String domain) {
m_domain = domain;
public void setDomains(String domains) {
if (domains != null) {
m_eligibleDomains = Splitters.by(',').noEmptyItem().trim().split(domains);
}
}
public void setDuration(long duration) {
......@@ -199,15 +203,16 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_logger.info("Start Tasks At " + new Date(start));
List<MessageQueue> queues = new ArrayList<MessageQueue>();
Period current = new Period(start, start + m_duration, queues);
CountDownLatch latch = new CountDownLatch(m_analyzerNames.size());
m_lastAnalyzers.clear();
m_lastAnalyzers.putAll(m_currentAnalyzers);
m_currentAnalyzers.clear();
for (String name : m_analyzerNames) {
MessageAnalyzer analyzer = m_factory.create(name, start, m_duration, m_domain, m_extraTime);
MessageAnalyzer analyzer = m_factory.create(name, start, m_duration, m_extraTime);
MessageQueue queue = lookup(MessageQueue.class);
Task task = new Task(m_factory, analyzer, queue);
Task task = new Task(m_factory, analyzer, queue, latch);
queue.offer(tree);
queues.add(queue);
......@@ -215,6 +220,12 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_currentAnalyzers.put(name, analyzer);
}
LogViewPostHandler handler = (LogViewPostHandler) m_factory.create("logview", start, m_duration, m_extraTime);
handler.setDomains(m_domains);
m_executor.submit(new FinalizerTask(m_factory, handler, m_duration, latch));
m_currentAnalyzers.put("logview", handler);
int len = m_periods.size();
if (len >= PROCESS_PERIOD) {
......@@ -224,6 +235,40 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_periods.add(current);
}
static class FinalizerTask implements Runnable {
private AnalyzerFactory m_factory;
private MessageAnalyzer m_handler;
private long m_duration;
private CountDownLatch m_latch;
public FinalizerTask(AnalyzerFactory factory, MessageAnalyzer handler, long duration, CountDownLatch latch) {
m_factory = factory;
m_handler = handler;
m_duration = duration;
m_latch = latch;
}
@Override
public void run() {
try {
m_latch.await(m_duration * 2, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
System.out.println("Waiting time out in FinalizerTask, do logview upload next");
}
try {
m_handler.doCheckpoint();
} catch (IOException e) {
e.printStackTrace();
} finally {
m_factory.release(m_handler);
}
}
}
static class Period {
private long m_startTime;
......@@ -253,10 +298,13 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private MessageQueue m_queue;
public Task(AnalyzerFactory factory, MessageAnalyzer analyzer, MessageQueue queue) {
private CountDownLatch m_latch;
public Task(AnalyzerFactory factory, MessageAnalyzer analyzer, MessageQueue queue, CountDownLatch latch) {
m_factory = factory;
m_analyzer = analyzer;
m_queue = queue;
m_latch = latch;
}
public MessageQueue getQueue() {
......@@ -267,6 +315,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_analyzer.analyze(m_queue);
m_factory.release(m_analyzer);
m_factory.release(m_queue);
m_latch.countDown();
}
}
}
\ No newline at end of file
......@@ -11,6 +11,7 @@ import com.dianping.cat.consumer.DefaultAnalyzerFactory;
import com.dianping.cat.consumer.DefaultMessageQueue;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.logview.LogViewPostHandler;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.problem.handler.FailureHandler;
import com.dianping.cat.consumer.problem.handler.Handler;
......@@ -35,7 +36,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class).config(E("consumerId").value("realtime") //
, E("extraTime").value(property("extraTime", "300000"))//
, E("analyzerNames").value("problem,transaction,ip")));
, E("analyzers").value("problem,transaction,ip")));
String failureTypes = "Error,RuntimeException,Exception";
......@@ -54,6 +55,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(IpAnalyzer.class).is(PER_LOOKUP));
all.add(C(LogViewPostHandler.class).is(PER_LOOKUP) //
.req(BucketManager.class, MessagePathBuilder.class));
return all;
}
......
package com.dianping.cat.consumer.logview;
import java.io.IOException;
import java.util.Date;
import java.util.Set;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.dianping.cat.storage.internal.DefaultBucket;
import com.site.lookup.annotation.Inject;
public class LogViewPostHandler implements MessageAnalyzer {
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private BucketManager m_bucketManager;
private long m_startTime;
private Set<String> m_domains;
@Override
public void analyze(MessageQueue queue) {
throw new UnsupportedOperationException("This method should not be called!");
}
@Override
public void doCheckpoint() throws IOException {
for (String domain : m_domains) {
String path = m_pathBuilder.getMessagePath(domain, new Date(m_startTime));
try {
DefaultBucket<byte[]> localBucket = (DefaultBucket<byte[]>) m_bucketManager.getBytesBucket(path);
Bucket<byte[]> hdfsBucket = m_bucketManager.getHdfsBucket(path);
hdfsBucket.deleteAndCreate();
for (String id : localBucket.getIds()) {
byte[] data = localBucket.findById(id);
String[] tags = localBucket.findTagsById(id);
hdfsBucket.storeById(id, data, tags);
}
localBucket.close();
hdfsBucket.flush();
hdfsBucket.close();
} catch (IOException e) {
throw new RuntimeException(String.format(
"Error when copying data from local bucket to HDFS bucket for %s.", path), e);
}
}
}
public void initialize(long startTime) {
m_startTime = startTime;
}
public void setDomains(Set<String> domains) {
m_domains = domains;
}
}
......@@ -44,8 +44,6 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
private Map<String, ProblemReport> m_reports = new HashMap<String, ProblemReport>();
private Bucket<MessageTree> m_messageBucket;
private long m_extraTime;
private Logger m_logger;
......@@ -183,26 +181,21 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
String threadTag = "t:" + tree.getThreadId();
try {
m_messageBucket.storeById(messageId, tree, threadTag);
String path = m_pathBuilder.getMessagePath(domain, new Date(m_startTime));
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(path);
bucket.storeById(messageId, tree, threadTag);
} catch (IOException e) {
m_logger.error("Error when storing message for problem analyzer!", e);
}
}
}
public void setAnalyzerInfo(long startTime, long duration, String domain, long extraTime) {
public void setAnalyzerInfo(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
m_duration = duration;
String path = m_pathBuilder.getMessagePath(new Date(m_startTime));
try {
m_messageBucket = m_bucketManager.getMessageBucket(path);
} catch (Exception e) {
throw new RuntimeException(String.format("Unable to create message bucket at %s.", path), e);
}
loadReports();
}
......@@ -212,7 +205,6 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
return;
}
m_bucketManager.closeBucket(m_messageBucket);
storeReports(reports);
}
......
......@@ -47,8 +47,6 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
private Map<String, TransactionReport> m_reports = new HashMap<String, TransactionReport>();
private Bucket<MessageTree> m_messageBucket;
private long m_extraTime;
private Logger m_logger;
......@@ -180,7 +178,10 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String threadTag = "t:" + tree.getThreadId();
try {
m_messageBucket.storeById(messageId, tree, threadTag);
String path = m_pathBuilder.getMessagePath(domain, new Date(m_startTime));
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(path);
bucket.storeById(messageId, tree, threadTag);
} catch (IOException e) {
m_logger.error("Error when storing message for transaction analyzer!", e);
}
......@@ -273,19 +274,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
range.setSum(range.getSum() + d);
}
public void setAnalyzerInfo(long startTime, long duration, String domain, long extraTime) {
public void setAnalyzerInfo(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
m_duration = duration;
String path = m_pathBuilder.getMessagePath(new Date(m_startTime));
try {
m_messageBucket = m_bucketManager.getMessageBucket(path);
} catch (Exception e) {
throw new RuntimeException(String.format("Unable to create message bucket at %s.", path), e);
}
loadReports();
}
......@@ -295,7 +288,6 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
return;
}
m_bucketManager.closeBucket(m_messageBucket);
storeReports(reports);
}
......
......@@ -16,7 +16,7 @@
<configuration>
<consumerId>realtime</consumerId>
<extraTime>300000</extraTime>
<analyzerNames>problem,transaction,ip</analyzerNames>
<analyzers>problem,transaction,ip</analyzers>
</configuration>
<requirements>
<requirement>
......@@ -79,5 +79,18 @@
<implementation>com.dianping.cat.consumer.ip.IpAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.consumer.logview.LogViewPostHandler</role>
<implementation>com.dianping.cat.consumer.logview.LogViewPostHandler</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
</components>
</plexus>
......@@ -6,17 +6,17 @@ import com.dianping.cat.consumer.ManyAnalyzerTest.MockAnalyzer2;
import com.dianping.cat.consumer.ManyAnalyzerTest.MockAnalyzer3;
import com.dianping.cat.message.spi.MessageAnalyzer;
public class ManyAnalyerMockFactory implements AnalyzerFactory{
public class ManyAnalyerMockFactory implements AnalyzerFactory {
public MessageAnalyzer create(String name, long start, long duration ,String domain ,long extraTime) {
public MessageAnalyzer create(String name, long start, long duration, long extraTime) {
if (name.equals("mock1")) {
MockAnalyzer1 analyzer = new MockAnalyzer1();
MockAnalyzer1 analyzer = new MockAnalyzer1();
return analyzer;
} else if(name.equals("mock2")) {
MockAnalyzer2 analyzer = new MockAnalyzer2();
} else if (name.equals("mock2")) {
MockAnalyzer2 analyzer = new MockAnalyzer2();
return analyzer;
} else if (name.equals("mock3")) {
MockAnalyzer3 analyzer = new MockAnalyzer3();
MockAnalyzer3 analyzer = new MockAnalyzer3();
return analyzer;
}
return null;
......@@ -24,6 +24,6 @@ public class ManyAnalyerMockFactory implements AnalyzerFactory{
@Override
public void release(Object component) {
}
}
......@@ -4,17 +4,17 @@ import com.dianping.cat.consumer.AnalyzerFactory;
import com.dianping.cat.consumer.OneAnalyzerTwoDurationTest.MockAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
public class OneAnalyzerMockFactory implements AnalyzerFactory{
public MessageAnalyzer create(String name, long start, long duration ,String domain ,long extraTime) {
public class OneAnalyzerMockFactory implements AnalyzerFactory {
public MessageAnalyzer create(String name, long start, long duration, long extraTime) {
if (name.equals("mock")) {
MockAnalyzer analyzer = new OneAnalyzerTwoDurationTest.MockAnalyzer();
MockAnalyzer analyzer = new OneAnalyzerTwoDurationTest.MockAnalyzer();
return analyzer;
}
}
return null;
}
@Override
public void release(Object component) {
}
}
......@@ -22,11 +22,12 @@ import com.site.lookup.ComponentTestCase;
* @since Jan 5, 2012
*/
@RunWith(JUnit4.class)
public class TransactionReportMessageAnalyzerTest extends ComponentTestCase{
public class TransactionReportMessageAnalyzerTest extends ComponentTestCase {
/**
* Test method for {@link com.dianping.cat.consumer.transaction.TransactionReportAnalyzer#process(com.dianping.cat.message.spi.MessageTree)}.
* Test method for
* {@link com.dianping.cat.consumer.transaction.TransactionReportAnalyzer#process(com.dianping.cat.message.spi.MessageTree)}
* .
*
* @throws InterruptedException
*/
......@@ -35,11 +36,10 @@ public class TransactionReportMessageAnalyzerTest extends ComponentTestCase{
long current = System.currentTimeMillis();
long duration = 60 * 60 * 1000;
long extraTime = 5 * 60 * 1000;
long start = current - current % (60 * 60 * 1000) -1000L*60*60;
long start = current - current % (60 * 60 * 1000) - 1000L * 60 * 60;
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
TransactionAnalyzer analyzer = (TransactionAnalyzer) factory.create("transaction", start, duration, "testDomain",
extraTime);
TransactionAnalyzer analyzer = (TransactionAnalyzer) factory.create("transaction", start, duration, extraTime);
for (int i = 1; i <= 1000; i++) {
MessageTree tree = new DefaultMessageTree();
......@@ -92,8 +92,8 @@ public class TransactionReportMessageAnalyzerTest extends ComponentTestCase{
assertEquals(500.5, n2.getAvg());
assertEquals(500500.0, n2.getSum());
// System.out.println(new DefaultJsonBuilder().buildJson(report));
// System.out.println(report.toString());
// System.out.println(new DefaultJsonBuilder().buildJson(report));
// System.out.println(report.toString());
}
}
......@@ -3,7 +3,5 @@ package com.dianping.cat.message.spi;
public interface MessageConsumer {
public String getConsumerId();
public String getDomain();
public void consume(MessageTree tree);
}
......@@ -13,7 +13,7 @@ public interface MessagePathBuilder {
public String getLogViewPath(String messageId);
public String getMessagePath(Date timestamp);
public String getMessagePath(String domain, Date timestamp);
public String getReportPath(Date timestamp);
}
......@@ -11,11 +11,6 @@ public class DummyConsumer implements MessageConsumer {
return ID;
}
@Override
public String getDomain() {
return null;
}
@Override
public void consume(MessageTree tree) {
// Do nothing here
......
......@@ -44,11 +44,6 @@ public class DumpToHtmlConsumer implements MessageConsumer, Initializable, LogEn
return ID;
}
@Override
public String getDomain() {
return m_domain;
}
@Override
public void initialize() throws InitializationException {
File baseDir = m_builder.getLogViewBaseDir();
......
......@@ -66,10 +66,10 @@ public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializa
}
@Override
public String getMessagePath(Date timestamp) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/message");
public String getMessagePath(String domain, Date timestamp) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/message-{1}");
return format.format(new Object[] { timestamp });
return format.format(new Object[] { timestamp, domain });
}
@Override
......
......@@ -24,10 +24,4 @@ public interface Bucket<T> {
public T findNextById(String id, String tag) throws IOException;;
public T findPreviousById(String id, String tag) throws IOException;;
public static enum Direction {
FORWARD,
BACKWARD;
}
}
......@@ -108,6 +108,11 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
}
}
public String[] findTagsById(String id) {
// TODO
return new String[0];
}
@Override
public T findById(String id) {
Long offset = m_idToOffsets.get(id);
......@@ -335,4 +340,11 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
}
}
}
public static enum Direction {
FORWARD,
BACKWARD;
}
}
<?xml version="1.0" encoding="utf-8"?>
<data-sources>
<data-source id="cat">
<maximum-pool-size>3</maximum-pool-size>
<connection-timeout>1s</connection-timeout>
<idle-timeout>10m</idle-timeout>
<statement-cache-size>1000</statement-cache-size>
<properties>
<driver>com.mysql.jdbc.Driver</driver>
<url><![CDATA[jdbc:mysql://localhost:3306/cat]]></url>
<user>root</user>
<password>123456</password>
<connectionProperties><![CDATA[useUnicode=true&autoReconnect=true]]></connectionProperties>
</properties>
</data-source>
</data-sources>
......@@ -9,7 +9,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-home</artifactId>
<name>CAT Home</name>
<packaging>war</packaging>
<packaging>${packaging}</packaging>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
......@@ -54,7 +54,7 @@
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<scope>test</scope>
<scope>${test-framework.scope}</scope>
</dependency>
</dependencies>
<build>
......@@ -104,6 +104,25 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<index>true</index>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.dianping.cat.Server</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<packaging>war</packaging>
<test-framework.scope>test</test-framework.scope>
</properties>
</project>
package com.dianping.cat;
import java.io.File;
import org.codehaus.plexus.PlexusContainer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mortbay.jetty.Handler;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.servlet.GzipFilter;
import org.unidal.webres.server.support.SimpleServerSupport;
import org.unidal.webres.taglib.support.JettyTestSupport;
import com.dianping.cat.servlet.CatServlet;
import com.site.lookup.ComponentTestCase;
import com.site.web.MVC;
public class Server extends SimpleServerSupport {
private static ComponentAdaptor s_adaptor = new ComponentAdaptor();
private static MVC s_mvc = new MVC();
private static CatServlet s_cat = new CatServlet();
@AfterClass
public static void afterClass() throws Exception {
JettyTestSupport.shutdownServer();
}
@BeforeClass
public static void beforeClass() throws Exception {
JettyTestSupport.startServer(new Server());
}
public static void main(String[] args) throws Exception {
Server server = new Server();
Server.beforeClass();
try {
server.before();
server.startServer();
server.after();
} finally {
Server.shutdownServer();
}
}
@Override
public void after() {
super.after();
s_adaptor.after();
}
@Override
public void before() {
s_adaptor.before();
s_mvc.setContainer(s_adaptor.getContainer());
super.before();
}
@Override
protected String getContextPath() {
return "/cat";
}
@Override
protected File getScratchDir() {
File work = new File(System.getProperty("java.io.tmpdir", "."), "Cat");
work.mkdirs();
return work;
}
@Override
protected int getServerPort() {
return 2281;
}
@Override
protected File getWarRoot() {
return new File("src/main/webapp");
}
@Override
protected void postConfigure(Context ctx) {
ServletHolder holder = new ServletHolder(s_mvc);
ctx.addServlet(new ServletHolder(s_cat), "/s/*");
ctx.addServlet(holder, "/");
ctx.addServlet(holder, "/r/*");
ctx.addFilter(GzipFilter.class, "/r/*", Handler.ALL);
super.postConfigure(ctx);
}
@Test
public void startServer() throws Exception {
System.out.println(String.format("[%s] Press any key to stop server ... ", getTimestamp()));
System.in.read();
}
static class ComponentAdaptor extends ComponentTestCase {
public void after() {
try {
super.tearDown();
} catch (Exception e) {
e.printStackTrace();
}
}
public void before() {
try {
super.setUp();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public PlexusContainer getContainer() {
return super.getContainer();
}
@Override
public <T> T lookup(Class<T> role) throws Exception {
return super.lookup(role);
}
@Override
public <T> T lookup(Class<T> role, Object roleHint) throws Exception {
return super.lookup(role, roleHint);
}
}
}
......@@ -39,6 +39,8 @@ public class ReportContext<T extends ActionPayload<? extends Page, ? extends Act
new ResourceConfigurator().configure(registry);
new ResourceTagConfigurator().configure(registry);
new ResourceTagLibConfigurator().configure(registry);
registry.lock();
}
ResourceRuntimeContext.setup(contextPath);
......
......@@ -7,8 +7,10 @@ import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.report.page.model.logview.CompositeLogViewService;
import com.dianping.cat.report.page.model.logview.HdfsLogViewService;
import com.dianping.cat.report.page.model.logview.LocalLogViewService;
import com.dianping.cat.report.page.model.problem.CompositeProblemService;
import com.dianping.cat.report.page.model.problem.HdfsProblemService;
import com.dianping.cat.report.page.model.problem.LocalProblemService;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.report.page.model.transaction.CompositeTransactionService;
......@@ -32,14 +34,20 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
all.add(C(ModelService.class, "problem-local", LocalProblemService.class) //
.req(MessageConsumer.class, "realtime"));
all.add(C(ModelService.class, "problem-hdfs", HdfsProblemService.class) //
.req(BucketManager.class, MessagePathBuilder.class));
all.add(C(ModelService.class, "problem", CompositeProblemService.class) //
.req(ModelService.class, new String[] { "problem-local" }, "m_services"));
.req(ModelService.class, new String[] { "problem-local", "problem-hdfs" }, "m_services"));
all.add(C(ModelService.class, "logview-local", LocalLogViewService.class) //
.req(MessagePathBuilder.class, BucketManager.class) //
.req(MessageCodec.class, "html"));
all.add(C(ModelService.class, "logview-hdfs", HdfsLogViewService.class) //
.req(MessagePathBuilder.class, BucketManager.class) //
.req(MessageCodec.class, "html", "m_htmlCodec") //
.req(MessageCodec.class, "plain-text", "m_plainCodec"));
all.add(C(ModelService.class, "logview", CompositeLogViewService.class) //
.req(ModelService.class, new String[] { "logview-local" }, "m_services"));
.req(ModelService.class, new String[] { "logview-local", "logview-hdfs" }, "m_services"));
return all;
}
......
package com.dianping.cat.report.page.model.logview;
import java.nio.charset.Charset;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HdfsLogViewService implements ModelService<String> {
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private BucketManager m_bucketManager;
@Inject(value = "plain-text")
private MessageCodec m_plainDecode;
@Inject(value = "html")
private MessageCodec m_htmlCodec;
@Override
public ModelResponse<String> invoke(ModelRequest request) {
String messageId = request.getProperty("messageId");
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
String path = m_pathBuilder.getMessagePath(id.getDomain(), new Date(id.getTimestamp()));
ModelResponse<String> response = new ModelResponse<String>();
try {
Bucket<byte[]> bucket = m_bucketManager.getHdfsBucket(path);
byte[] data = null;
if (tag != null && direction != null) {
Boolean d = Boolean.valueOf(direction);
if (d.booleanValue()) {
data = bucket.findNextById(messageId, tag);
} else {
data = bucket.findPreviousById(messageId, tag);
}
}
// if not found, use current instead
if (data == null) {
data = bucket.findById(messageId);
}
if (data != null) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
MessageTree tree = new DefaultMessageTree();
buf.writeBytes(data);
m_plainDecode.decode(buf, tree);
buf.resetReaderIndex();
buf.resetWriterIndex();
m_htmlCodec.encode(tree, buf);
buf.readInt(); // get rid of length
response.setModel(buf.toString(Charset.forName("utf-8")));
}
} catch (Exception e) {
response.setException(e);
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
return request.getPeriod().isHistorical();
}
}
......@@ -33,7 +33,7 @@ public class LocalLogViewService implements ModelService<String> {
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
String path = m_pathBuilder.getMessagePath(new Date(id.getTimestamp()));
String path = m_pathBuilder.getMessagePath(id.getDomain(), new Date(id.getTimestamp()));
ModelResponse<String> response = new ModelResponse<String>();
try {
......@@ -55,11 +55,13 @@ public class LocalLogViewService implements ModelService<String> {
tree = bucket.findById(messageId);
}
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8096);
if (tree != null) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8096);
m_codec.encode(tree, buf);
buf.readInt(); // get rid of length
response.setModel(buf.toString(Charset.forName("utf-8")));
m_codec.encode(tree, buf);
buf.readInt(); // get rid of length
response.setModel(buf.toString(Charset.forName("utf-8")));
}
} catch (Exception e) {
response.setException(e);
}
......
package com.dianping.cat.report.page.model.problem;
import java.util.Date;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HdfsProblemService implements ModelService<ProblemReport> {
@Inject
private BucketManager m_bucketManager;
@Inject
private MessagePathBuilder m_pathBuilder;
@Override
public ModelResponse<ProblemReport> invoke(ModelRequest request) {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
String path = m_pathBuilder.getReportPath(new Date(date));
ModelResponse<ProblemReport> response = new ModelResponse<ProblemReport>();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getStringBucket(path);
String xml = bucket.findById("problem-" + domain);
if (xml == null) {
ProblemReport report = new DefaultXmlParser().parse(xml);
response.setModel(report);
}
} catch (Exception e) {
response.setException(e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
}
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
return request.getPeriod().isHistorical();
}
}
Manifest-Version: 1.0
Class-Path:
Main-Class: com.dianping.cat.Server
......@@ -88,6 +88,19 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>problem-hdfs</role-hint>
<implementation>com.dianping.cat.report.page.model.problem.HdfsProblemService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>problem</role-hint>
......@@ -97,6 +110,7 @@
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hints>
<role-hint>problem-local</role-hint>
<role-hint>problem-hdfs</role-hint>
</role-hints>
<field-name>m_services</field-name>
</requirement>
......@@ -119,6 +133,29 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>logview-hdfs</role-hint>
<implementation>com.dianping.cat.report.page.model.logview.HdfsLogViewService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>html</role-hint>
<field-name>m_htmlCodec</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
<field-name>m_plainCodec</field-name>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hint>logview</role-hint>
......@@ -128,6 +165,7 @@
<role>com.dianping.cat.report.page.model.spi.ModelService</role>
<role-hints>
<role-hint>logview-local</role-hint>
<role-hint>logview-hdfs</role-hint>
</role-hints>
<field-name>m_services</field-name>
</requirement>
......
......@@ -51,6 +51,13 @@
<tag-class>org.unidal.webres.taglib.basic.UseCssTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[Set the css value with EL or a css ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the link URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -72,13 +79,6 @@
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[Set the css value with EL or a css ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Target placement for this css resource to render]]></description>
<name>target</name>
<required>false</required>
......@@ -114,18 +114,18 @@
<tag-class>org.unidal.webres.taglib.basic.SetTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[The name.]]></description>
<name>id</name>
<description><![CDATA[The value]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.String</type>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[The value]]></description>
<name>value</name>
<description><![CDATA[The name.]]></description>
<name>id</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
<type>java.lang.String</type>
</attribute>
<dynamic-attributes>false</dynamic-attributes>
</tag>
......@@ -135,6 +135,13 @@
<tag-class>org.unidal.webres.taglib.basic.UseJsTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[Set the js value with EL or a js ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the link URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -156,13 +163,6 @@
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[Set the js value with EL or a js ref.]]></description>
<name>value</name>
<required>false</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Target placement for this js resource to render]]></description>
<name>target</name>
<required>false</required>
......@@ -177,6 +177,13 @@
<tag-class>org.unidal.webres.taglib.basic.LinkTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[The value for link, could be a expression or a link ref.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the link URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -190,13 +197,6 @@
<rtexprvalue>true</rtexprvalue>
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[The value for link, could be a expression or a link ref.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<dynamic-attributes>true</dynamic-attributes>
</tag>
<tag>
......@@ -205,6 +205,13 @@
<tag-class>org.unidal.webres.taglib.basic.ImageTagHandler</tag-class>
<body-content>JSP</body-content>
<attribute>
<description><![CDATA[The value for image, could be a expression or a image path.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<attribute>
<description><![CDATA[Identify whether the image URL is secure or not.]]></description>
<name>secure</name>
<required>false</required>
......@@ -225,13 +232,6 @@
<rtexprvalue>true</rtexprvalue>
<type>java.lang.String</type>
</attribute>
<attribute>
<description><![CDATA[The value for image, could be a expression or a image path.]]></description>
<name>value</name>
<required>true</required>
<rtexprvalue>true</rtexprvalue>
<type>java.lang.Object</type>
</attribute>
<dynamic-attributes>true</dynamic-attributes>
</tag>
<tag>
......
......@@ -19,7 +19,7 @@ import com.site.lookup.ComponentTestCase;
import com.site.test.browser.BrowserManager;
import com.site.web.MVC;
public class SimpleServer extends SimpleServerSupport {
public class TestServer extends SimpleServerSupport {
private static ComponentAdaptor s_adaptor = new ComponentAdaptor();
private static MVC s_mvc = new MVC();
......@@ -33,20 +33,20 @@ public class SimpleServer extends SimpleServerSupport {
@BeforeClass
public static void beforeClass() throws Exception {
JettyTestSupport.startServer(new SimpleServer());
JettyTestSupport.startServer(new Server());
}
public static void main(String[] args) throws Exception {
SimpleServer server = new SimpleServer();
Server server = new Server();
SimpleServer.beforeClass();
Server.beforeClass();
try {
server.before();
server.startServer();
server.after();
} finally {
SimpleServer.shutdownServer();
Server.shutdownServer();
}
}
......
<?xml version="1.0" encoding="utf-8"?>
<data-sources>
<data-source id="cat">
<maximum-pool-size>3</maximum-pool-size>
<connection-timeout>1s</connection-timeout>
<idle-timeout>10m</idle-timeout>
<statement-cache-size>1000</statement-cache-size>
<properties>
<driver>com.mysql.jdbc.Driver</driver>
<url><![CDATA[jdbc:mysql://localhost:3306/cat]]></url>
<user>root</user>
<password>123456</password>
<connectionProperties><![CDATA[useUnicode=true&autoReconnect=true]]></connectionProperties>
</properties>
</data-source>
</data-sources>
......@@ -26,11 +26,6 @@ public class HdfsDumpConsumer implements MessageConsumer {
return ID;
}
@Override
public String getDomain() {
return m_domain;
}
public void setDomain(String domain) {
m_domain = domain;
}
......
......@@ -11,7 +11,7 @@ final class DatabaseConfigurator extends AbstractJdbcResourceConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(defineJdbcDataSourceConfigurationManagerComponent("datasources.xml"));
all.add(defineJdbcDataSourceConfigurationManagerComponent("/data/appdatas/cat/datasources.xml"));
all.add(defineJdbcDataSourceComponent("cat", "${jdbc.driver}", "${jdbc.url}", "${jdbc.user}", "${jdbc.password}",
"<![CDATA[${jdbc.connectionProperties}]]>"));
......
......@@ -54,7 +54,7 @@
<role>com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager</role>
<implementation>com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager</implementation>
<configuration>
<datasourceFile>datasources.xml</datasourceFile>
<datasourceFile>/data/appdatas/cat/datasources.xml</datasourceFile>
</configuration>
</component>
<component>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册