提交 a8fb2ee4 编写于 作者: F Frankie Wu

Merge branch 'master' of ssh://192.168.8.22:58422/cat

......@@ -2,3 +2,4 @@
/target/*
/*/target/*
/*/.*
.*
......@@ -10,6 +10,11 @@
<artifactId>cat-consumer</artifactId>
<name>CAT Consumer</name>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......@@ -19,4 +24,25 @@
<artifactId>lookup</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.0</version>
<executions>
<execution>
<id>default-cli</id>
<phase>generate-sources</phase>
<goals>
<goal>dal-model</goal>
</goals>
<configuration>
<manifest>${basedir}/src/main/resources/META-INF/dal/model/failure-report-manifest.xml</manifest>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.dianping.cat.consumer.configuration;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.consumer.impl2.FailureReportAnalyzer;
import com.dianping.cat.consumer.impl2.RealtimeConsumer;
import com.dianping.cat.message.consumer.impl.DefaultMessageQueue;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageQueue;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(MessageQueue.class, DefaultMessageQueue.class).is(PER_LOOKUP));
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.config(E("consumerId").value("realtime") //
, E("domain").value("Review") //
, E("analyzerNames").value("failure-report") //
));
all.add(C(MessageAnalyzer.class, "failure-report",
FailureReportAnalyzer.class) //
.is(PER_LOOKUP));
return all;
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
}
package com.dianping.cat.consumer.impl2;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageQueue;
public class FailureReportAnalyzer implements MessageAnalyzer {
@Override
public void analyze(MessageQueue queue) {
// TODO Auto-generated method stub
}
}
package com.dianping.cat.consumer.impl2;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageQueue;
public class PeriodicTask implements Runnable {
private long m_startTime;
private long m_duration;
private MessageAnalyzer m_analyzer;
private MessageQueue m_queue;
public PeriodicTask(long startTime, long duration,
MessageAnalyzer analyzer, MessageQueue queue) {
}
public void run() {
m_analyzer.analyze(m_queue);
}
public long getDuration() {
return m_duration;
}
public void setDuration(long duration) {
m_duration = duration;
}
public MessageQueue getQueue() {
return m_queue;
}
public void setStartTime(long startTime) {
m_startTime = startTime;
}
}
\ No newline at end of file
package com.dianping.cat.consumer.impl2;
import java.util.ArrayList;
import java.util.List;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
import com.site.helper.Splitters;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class RealtimeConsumer extends ContainerHolder implements
MessageConsumer, Initializable {
@Inject
private String m_consumerId;
@Inject
private String m_domain;
@Inject
private long m_duration = 3600 * 1000L; // 1 hour
@Inject
private List<String> m_analyzerNames;
private List<PeriodicTask> m_tasks;
@Override
public void consume(MessageTree tree) {
// TODO Auto-generated method stub
}
@Override
public String getConsumerId() {
return m_consumerId;
}
@Override
public String getDomain() {
return m_domain;
}
@Override
public void initialize() throws InitializationException {
long startTime = 0; // TODO
m_tasks = new ArrayList<PeriodicTask>();
for (String name : m_analyzerNames) {
MessageAnalyzer analyzer = lookup(MessageAnalyzer.class, name);
MessageQueue queue = lookup(MessageQueue.class);
PeriodicTask task = new PeriodicTask(startTime, m_duration,
analyzer, queue);
m_tasks.add(task);
}
for (PeriodicTask task : m_tasks) {
Thread thread =new Thread(task);
thread.start();
}
}
public void setAnalyzerNames(String analyzerNames) {
m_analyzerNames = Splitters.by(',').noEmptyItem().trim()
.split(analyzerNames);
}
public void setConsumerId(String consumerId) {
m_consumerId = consumerId;
}
public void setDomain(String domain) {
m_domain = domain;
}
}
package com.dianping.cat.message.consumer.failure;
import com.dianping.cat.consumer.failurereport.entity.FailureReport;
import com.dianping.cat.consumer.failurereport.transform.DefaultJsonBuilder;
public class FailJSONReportStore implements FailReportStore{
@Override
public void storeFailureReport(FailureReport report) {
DefaultJsonBuilder jsonBuilder = new DefaultJsonBuilder();
jsonBuilder.visitFailureReport(report);
System.out.println(jsonBuilder.getString());
}
}
package com.dianping.cat.message.consumer.failure;
import com.dianping.cat.consumer.failurereport.entity.FailureReport;
public interface FailReportStore {
/**
* Store the total report
* @param report
*/
public void storeFailureReport(FailureReport report);
}
package com.dianping.cat.message.consumer.failure;
import java.io.IOException;
import org.xml.sax.SAXException;
import com.dianping.cat.consumer.failurereport.entity.FailureReport;
import com.dianping.cat.consumer.failurereport.transform.DefaultParser;
import com.site.helper.Files;
public class FailReportXMLStore implements FailReportStore {
@Override
public void storeFailureReport(FailureReport report) {
String xml;
try {
xml = Files.forIO().readFrom(
getClass().getResourceAsStream("/logView.xml"), "utf-8");
report = new DefaultParser().parse(xml);
System.out.println(report);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SAXException se) {
se.printStackTrace();
}
}
}
package com.dianping.cat.message.consumer.failure;
import com.dianping.cat.consumer.failurereport.entity.FailureReport;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessageTree;
public class FailureReportMessageAnalyzer extends AbstractMessageAnalyzer<FailureReport>{
@Override
protected void store(FailureReport result) {
// TODO Auto-generated method stub
}
@Override
public FailureReport generate() {
// TODO Auto-generated method stub
return null;
}
@Override
protected void process(MessageTree tree) {
// TODO Auto-generated method stub
}
}
package com.dianping.cat.message.consumer.failure;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.dianping.cat.consumer.failurereport.entity.Entry;
import com.dianping.cat.consumer.failurereport.entity.FailureReport;
import com.dianping.cat.consumer.failurereport.entity.Segment;
import com.dianping.cat.message.Message;
/**
* The class is used to record the state of the system. The is only record the
* lastest one hour.
*
* @author yong.you
*
*/
public class FailureState {
private long m_crrentTime;
private FailureReport m_report;
private List<FailReportStore> m_outputList;
private static final long HOUR = 60 * 60 * 1000;
private static final int LENGTH = 60;
private static final SimpleDateFormat SDF=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
public FailureState() {
m_crrentTime = 0;
}
public String getTimeStr(Message message){
Calendar cal=Calendar.getInstance();
cal.setTimeInMillis(message.getTimestamp());
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
Date date=new Date();
date.setTime(cal.getTimeInMillis());
try {
return SDF.format(date);
} catch (Exception e) {
return null;
}
}
/**
* -1 in last hour, 0 in current hour, 1 in next hour, 2 in next many hours
*
* @param message
* @return
*/
public int isInDuration(Message message) {
/*long time = message.getTimestamp();
Date startDate = m_failureReport.getStartTime();
long startTime = startDate.getTime();
if (time < startTime) {
return -1;
}
if (time >= startTime && time < startTime + HOUR) {
return 0;
}
if (time < startTime + HOUR * 2 && time >= startTime + HOUR) {
return 1;
}*/
return 2;
}
/**
* 获取到Message的时间段。
* 如果是新的时间段,则新建对象,否则更新对象。
* 新建对象之前需要将前60分钟的对象,存入历史数据。
* 更新machines信息。
* @param message
*/
public void addMessage(Message message) {
Entry entry =convertToEntry(message);
}
public void registerOutput(FailReportStore output) {
m_outputList.add(output);
}
public Entry convertToEntry(Message message) {
//TODO
Entry entry = new Entry();
entry.setText(message.getName());
entry.setType(message.getType());
return entry;
}
}
package com.dianping.cat.message.consumer.impl;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageQueue implements MessageQueue {
private long m_duration;
private long m_start;
private long m_end;
private Queue<MessageTree> queue = new LinkedBlockingQueue<MessageTree>();
public DefaultMessageQueue(int minutes,long start){
m_start = start;
m_end = start + 60*60*1000;
m_duration =minutes*60 *1000;
}
@Override
public boolean isActive() {
if(queue.size()>0){
return false;
}
return isExpired();
}
public boolean isExpired() {
long currentTime = System.currentTimeMillis();
if(currentTime - m_end > m_duration){
return true;
}
return false;
}
@Override
public MessageTree poll() {
return queue.poll();
}
@Override
public void offer(MessageTree tree) {
queue.add(tree);
}
public boolean inRange(MessageTree tree){
long time = tree.getMessage().getTimestamp();
if(time<m_end&&time>=m_start){
return true;
}
return false;
}
public long getStart() {
return m_start;
}
@Override
public int size() {
return 0;
}
}
package com.dianping.cat.message.consumer.impl;
import java.util.List;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageTree;
public class RealtimeConsumer implements MessageConsumer{
public List<RealtimeTask> tasks;
@Override
public String getConsumerId() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getDomain() {
// TODO Auto-generated method stub
return null;
}
@Override
public void consume(MessageTree tree) {
for(RealtimeTask task:tasks){
task.consume(tree);
}
}
}
package com.dianping.cat.message.consumer.impl;
public class RealtimeConsumerConfig {
private static RealtimeConsumerConfig config;
private String m_domain;
private int m_queueTime;
private int m_duration;
private String m_consumerName;
private String anaylyzerClassName;
private static final String DEFAULT_CONFIG = "realtimeConsumer.property";
private static boolean IS_DEFAULT = true;
public static synchronized RealtimeConsumerConfig getConfig() {
if (config == null) {
}
return config;
}
public static synchronized RealtimeConsumerConfig getConfig(String fileName) {
if (config == null) {
}
return config;
}
public boolean containsDomain(String domain){
if(IS_DEFAULT)
return true;
return true;
}
public String getDomain() {
return m_domain;
}
public int getQueueTime() {
return m_queueTime;
}
public int getDuration() {
return m_duration;
}
public String getConsumerName() {
return m_consumerName;
}
}
package com.dianping.cat.message.consumer.impl;
import org.apache.log4j.Logger;
import com.dianping.cat.message.consumer.failure.FailureReportMessageAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
/**
* The consumer is used to record the error and exception state of the system。
*
* The consumer records the lasteast info of one hour。
*
* 考虑问题: 1、Consumer能支持domain进行拆分部署多台物理机。 2、不需要考虑内存数据的稳定性,报表按照每个小时出一份。
* 3、需要考虑Consumer不能影响到其他的程序,添加一个Message需要立即返回。
* 4、需要考虑后续处理Consumer的线程稳定性,尽量使用一个线程处理一段时间内的东西并且返回。
* duration、Queue可以尽量分为3个类型,上一个队列,正在处理的队列,下一个队列,他们可以循环利用。
* 6、内存中一个小时的数据存放问题,提供Service给外界访问。 7、启动线程更新报表的时钟KEY信息。
*
* @author yong.you
*
*/
public class RealtimeTask {
private static Logger logger = Logger.getLogger(RealtimeTask.class);
private RealtimeConsumerConfig m_config;
private DefaultMessageQueue m_firstQueue;
private DefaultMessageQueue m_secondQueue;
//传进来业务自己实现的MessageAnalyzer的Class类。
@Inject
private String className;
public RealtimeTask(RealtimeConsumerConfig config) {
long currentTimeMillis = System.currentTimeMillis();
long lastHour = currentTimeMillis - currentTimeMillis% m_config.getQueueTime();
m_firstQueue = new DefaultMessageQueue(m_config.getDuration(), lastHour);
m_secondQueue = new DefaultMessageQueue(m_config.getDuration(), lastHour+m_config.getQueueTime());
startThread(m_firstQueue);
startThread(m_secondQueue);
}
/**
* 将MessageTree放入Queue中。 将下一个小时MessageTree放入下一个Queue,new一个Queue,new一个Thread。
*/
public void consume(MessageTree tree) {
if(m_firstQueue.isExpired()){
switchQueue();
consume(tree);
}
else{
if(m_firstQueue.inRange(tree)){
m_firstQueue.offer(tree);
}
else if(m_secondQueue.inRange(tree)){
m_secondQueue.offer(tree);
}
else {
logger.error("Discard it "+ tree);
}
}
}
public void switchQueue(){
long secondQueueTime = m_secondQueue.getStart();
m_firstQueue = m_secondQueue;
m_secondQueue = new DefaultMessageQueue(m_config.getDuration(), secondQueueTime+m_config.getQueueTime());
startThread(m_secondQueue);
}
private void startThread(final MessageQueue queue){
// 根据传进来的ClassName生成一个实例
final FailureReportMessageAnalyzer analyzer = new FailureReportMessageAnalyzer();
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
analyzer.analyze(queue);
}
});
thread.start();
}
}
package com.dianping.cat.tools;
public class DateUtil {
}
<model>
<entity name="failure-report" root="true">
<attribute name="domain" value-type="String" />
<attribute name="startTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<attribute name="endTime" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<entity-ref name="machines" />
<entity-ref name="segment" map="true" map-name="segments" />
</entity>
<entity name="machines">
<element name="machine" value-type="String" list="true" list-name="machines" />
</entity>
<entity name="segment">
<attribute name="timestamp" value-type="Date" format="yyyy-MM-dd HH:mm:ss" />
<entity-ref name="entry" list="true" list-name="entries" />
</entity>
<entity name="entry">
<attribute name="type" value-type="String" />
<attribute name="messageId" value-type="int" />
<attribute name="threadId" value-type="int" />
<element name="text" value-type="String" text="true" />
</entity>
</model>
\ No newline at end of file
<manifest>
<file path="failure-report-codegen.xml"/>
<file path="failure-report.xml"/>
</manifest>
\ No newline at end of file
<model model-package="com.dianping.cat.consumer.failurereport"
enable-merger="true" enable-json-builder="true" enable-xml-parser="true" enable-base-visitor="true">
<entity name="failure-report" root="true">
<entity-ref name="segment" map="true" map-name="segments" />
</entity>
<entity name="segment">
<attribute name="id" value-type="String" key="true" />
</entity>
</model>
\ No newline at end of file
<plexus>
<components>
<component>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<implementation>com.dianping.cat.message.consumer.impl.DefaultMessageQueue</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>realtime</role-hint>
<implementation>com.dianping.cat.consumer.impl2.RealtimeConsumer</implementation>
<configuration>
<consumerId>realtime</consumerId>
<domain>Review</domain>
<analyzerNames>failure-report</analyzerNames>
</configuration>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageAnalyzer</role>
<role-hint>failure-report</role-hint>
<implementation>com.dianping.cat.consumer.impl2.FailureReportAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
</components>
</plexus>
package com.dianping.cat.message.consumer.failurereport;
import java.io.IOException;
import org.junit.Test;
import org.xml.sax.SAXException;
import com.dianping.cat.consumer.failurereport.entity.FailureReport;
import com.dianping.cat.consumer.failurereport.transform.DefaultJsonBuilder;
import com.dianping.cat.consumer.failurereport.transform.DefaultParser;
import com.site.helper.Files;
public class FailureReportModelTest {
@Test
public void test() throws SAXException, IOException {
String xml = Files.forIO().readFrom(
getClass().getResourceAsStream("/logView.xml"), "utf-8");
FailureReport report = new DefaultParser().parse(xml);
System.out.println(report);
report.getMachines().addMachine("a").addMachine("b");
System.out.println(report);
DefaultJsonBuilder jsonBuilder = new DefaultJsonBuilder();
jsonBuilder.visitFailureReport(report);
System.out.println(jsonBuilder.getString());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<failure-report domain="review" startTime="2011-12-27 14:00:00" endTime="2011-12-27 14:44:59">
<machines>
<machine>127.0.0.1</machine>
<machine>127.0.0.1</machine>
</machines>
<segment timestamp="2011-12-27 14:42:00">
<entry type="exception" messageId="12345" threadId="12365">java.lang.NullPointerExcepiton</entry>
<entry type="error" messageId="12345">java.lang.OutOfMemoryError</entry>
<entry type="exception" messageId="234567">java.lang.RuntimeExcepiton</entry>
<entry type="longUrl" messageId="234567">http://www.dianping.com/a/b/c?x=y</entry>
</segment>
<segment/>
</failure-report>
\ No newline at end of file
......@@ -8,6 +8,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-core</artifactId>
<version>1.0.0</version>
<name>CAT Core</name>
<dependencies>
<dependency>
......@@ -23,4 +24,8 @@
<artifactId>netty</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
</plugins>
</build>
</project>
......@@ -5,14 +5,17 @@ public abstract class AbstractMessageAnalyzer<R> implements MessageAnalyzer {
public void analyze(MessageQueue queue) {
while (queue.isActive()) {
MessageTree tree = queue.poll();
if (tree != null) {
process(tree);
} else {
try {
Thread.sleep(3 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
R result = generate();
store(result);
}
......
......@@ -3,5 +3,9 @@ package com.dianping.cat.message.spi;
public interface MessageQueue {
public boolean isActive();
public int size();
public MessageTree poll();
public void offer(MessageTree tree);
}
......@@ -15,6 +15,11 @@
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册