提交 79199a9a 编写于 作者: F Frankie Wu

add status update task for heartbeat

上级 36d93aad
......@@ -30,7 +30,7 @@
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.12</version>
<version>1.0.14</version>
<executions>
<execution>
<id>generate problem report model</id>
......
......@@ -6,29 +6,35 @@ import com.dianping.cat.consumer.problem.ProblemAnalyzer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
/**
* @author yong.you
* @since Jan 5, 2012
*/
public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerFactory {
@Inject
private boolean m_local;
@Override
public MessageAnalyzer create(String name, long start, long duration, long extraTime) {
if (name.equals("problem")) {
ProblemAnalyzer analyzer = lookup(ProblemAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, extraTime);
analyzer.setLocal(m_local);
return analyzer;
} else if (name.equals("transaction")) {
TransactionAnalyzer analyzer = lookup(TransactionAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, extraTime);
analyzer.setLocal(m_local);
return analyzer;
} else if (name.equals("event")) {
EventAnalyzer analyzer = lookup(EventAnalyzer.class);
analyzer.setAnalyzerInfo(start, duration, extraTime);
analyzer.setLocal(m_local);
return analyzer;
} else if (name.equals("ip")) {
IpAnalyzer analyzer = lookup(IpAnalyzer.class);
......@@ -43,4 +49,8 @@ public class DefaultAnalyzerFactory extends ContainerHolder implements AnalyzerF
public void release(Object component) {
super.release(component);
}
public void setLocal(boolean local) {
m_local = local;
}
}
......@@ -27,7 +27,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactory.class));
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactory.class) //
.config(E("local").value("true")));
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class).config(E("consumerId").value("realtime") //
......@@ -48,9 +49,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(TransactionAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, MessagePathBuilder.class));
all.add(C(EventAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, MessagePathBuilder.class));
.req(BucketManager.class, MessagePathBuilder.class));
all.add(C(IpAnalyzer.class).is(PER_LOOKUP));
......
......@@ -10,7 +10,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -51,7 +50,9 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
private long m_duration;
void closeMessageBuckets(Set<String> set) {
private boolean m_local;
void closeMessageBuckets() {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
......@@ -60,7 +61,10 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
try {
localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote");
if (!m_local) {
remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote");
}
} catch (Exception e) {
m_logger.error(String.format("Error when getting message bucket of %s!", timestamp), e);
} finally {
......@@ -78,7 +82,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
closeMessageBuckets();
}
@Override
......@@ -188,34 +192,8 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
// the message is required by some events
if (count > 0) {
String messageId = tree.getMessageId();
try {
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
Bucket<MessageTree> remoteBucket = m_bucketManager
.getMessageBucket(new Date(m_startTime), domain, "remote");
localBucket.storeById(messageId, tree);
remoteBucket.storeById(messageId, tree);
} catch (IOException e) {
m_logger.error("Error when storing message for event analyzer!", e);
}
}
}
int processTransaction(EventReport report, MessageTree tree, Transaction t) {
List<Message> children = t.getChildren();
int count = 0;
for (Message child : children) {
if (child instanceof Transaction) {
count += processTransaction(report, tree, (Transaction) child);
} else if (child instanceof Event) {
count += processEvent(report, tree, (Event) child);
}
storeMessage(tree);
}
return count;
}
int processEvent(EventReport report, MessageTree tree, Event event) {
......@@ -272,6 +250,21 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
}
int processTransaction(EventReport report, MessageTree tree, Transaction t) {
List<Message> children = t.getChildren();
int count = 0;
for (Message child : children) {
if (child instanceof Transaction) {
count += processTransaction(report, tree, (Transaction) child);
} else if (child instanceof Event) {
count += processEvent(report, tree, (Event) child);
}
}
return count;
}
public void setAnalyzerInfo(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
......@@ -280,6 +273,10 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
loadReports();
}
public void setLocal(boolean local) {
m_local = local;
}
@Override
protected void store(List<EventReport> reports) {
if (reports == null || reports.size() == 0) {
......@@ -287,7 +284,27 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
storeReports(reports);
closeMessageBuckets(m_reports.keySet());
closeMessageBuckets();
}
void storeMessage(MessageTree tree) {
String messageId = tree.getMessageId();
String domain = tree.getDomain();
try {
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
localBucket.storeById(messageId, tree);
if (!m_local) {
Bucket<MessageTree> remoteBucket = m_bucketManager
.getMessageBucket(new Date(m_startTime), domain, "remote");
remoteBucket.storeById(messageId, tree);
}
} catch (IOException e) {
m_logger.error("Error when storing message for event analyzer!", e);
}
}
void storeReports(Collection<EventReport> reports) {
......
......@@ -10,7 +10,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -51,7 +50,9 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
private long m_duration;
void closeMessageBuckets(Set<String> set) {
private boolean m_local;
void closeMessageBuckets() {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
......@@ -60,7 +61,10 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
try {
localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote");
if (!m_local) {
remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote");
}
} catch (Exception e) {
m_logger.error(String.format("Error when getting message bucket of %s!", timestamp), e);
} finally {
......@@ -78,7 +82,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
closeMessageBuckets();
}
@Override
......@@ -197,18 +201,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
if (count > 0) {
String messageId = tree.getMessageId();
try {
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
Bucket<MessageTree> remoteBucket = m_bucketManager
.getMessageBucket(new Date(m_startTime), domain, "remote");
localBucket.storeById(messageId, tree);
remoteBucket.storeById(messageId, tree);
} catch (IOException e) {
m_logger.error("Error when storing message for problem analyzer!", e);
}
storeMessage(tree);
}
}
......@@ -220,6 +213,10 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
loadReports();
}
public void setLocal(boolean local) {
m_local = local;
}
@Override
protected void store(List<ProblemReport> reports) {
if (reports == null || reports.size() == 0) {
......@@ -227,7 +224,27 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
storeReports(reports);
closeMessageBuckets(m_reports.keySet());
closeMessageBuckets();
}
void storeMessage(MessageTree tree) {
String messageId = tree.getMessageId();
String domain = tree.getDomain();
try {
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
localBucket.storeById(messageId, tree);
if (!m_local) {
Bucket<MessageTree> remoteBucket = m_bucketManager
.getMessageBucket(new Date(m_startTime), domain, "remote");
remoteBucket.storeById(messageId, tree);
}
} catch (IOException e) {
m_logger.error("Error when storing message for problem analyzer!", e);
}
}
void storeReports(Collection<ProblemReport> reports) {
......@@ -239,7 +256,10 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
try {
localBucket = m_bucketManager.getReportBucket(timestamp, "problem", "local");
remoteBucket = m_bucketManager.getReportBucket(timestamp, "problem", "remote");
if (!m_local) {
remoteBucket = m_bucketManager.getReportBucket(timestamp, "problem", "remote");
}
// delete old one, not append mode
localBucket.deleteAndCreate();
......@@ -249,7 +269,10 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
String domain = report.getDomain();
localBucket.storeById(domain, xml);
remoteBucket.storeById(domain, xml);
if (!m_local) {
remoteBucket.storeById(domain, xml);
}
}
t.setStatus(Message.SUCCESS);
......
......@@ -8,7 +8,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -49,7 +48,9 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
private long m_duration;
void closeMessageBuckets(Set<String> set) {
private boolean m_local;
void closeMessageBuckets() {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
......@@ -57,8 +58,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
Bucket<MessageTree> remoteBucket = null;
try {
localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote");
localBucket = m_bucketManager.getMessageBucket(timestamp, domain, "local");
if (!m_local) {
remoteBucket = m_bucketManager.getMessageBucket(timestamp, domain, "remote");
}
} catch (Exception e) {
m_logger.error(String.format("Error when getting message bucket of %s!", timestamp), e);
} finally {
......@@ -76,7 +80,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
closeMessageBuckets();
}
@Override
......@@ -165,19 +169,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
// the message is required by some transactions
if (count > 0) {
String messageId = tree.getMessageId();
try {
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain,
"local");
Bucket<MessageTree> remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain,
"remote");
localBucket.storeById(messageId, tree);
remoteBucket.storeById(messageId, tree);
} catch (IOException e) {
m_logger.error("Error when storing message for transaction analyzer!", e);
}
storeMessage(tree);
}
}
}
......@@ -275,6 +267,10 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
loadReports();
}
public void setLocal(boolean local) {
m_local = local;
}
@Override
protected void store(List<TransactionReport> reports) {
if (reports == null || reports.size() == 0) {
......@@ -282,19 +278,42 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
storeReports(reports);
closeMessageBuckets(m_reports.keySet());
closeMessageBuckets();
}
void storeMessage(MessageTree tree) {
String messageId = tree.getMessageId();
String domain = tree.getDomain();
try {
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
localBucket.storeById(messageId, tree);
if (!m_local) {
Bucket<MessageTree> remoteBucket = m_bucketManager
.getMessageBucket(new Date(m_startTime), domain, "remote");
remoteBucket.storeById(messageId, tree);
}
} catch (IOException e) {
m_logger.error("Error when storing message for transaction analyzer!", e);
}
}
void storeReports(Collection<TransactionReport> reports) {
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
Bucket<String> localBucket = null;
Bucket<String> remoteBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
try {
localBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local");
remoteBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "remote");
if (!m_local) {
remoteBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "remote");
}
// delete old one, not append mode
localBucket.deleteAndCreate();
......@@ -304,7 +323,10 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String domain = report.getDomain();
localBucket.storeById(domain, xml);
remoteBucket.storeById(domain, xml);
if (!m_local) {
remoteBucket.storeById(domain, xml);
}
}
t.setStatus(Message.SUCCESS);
......
......@@ -3,6 +3,9 @@
<component>
<role>com.dianping.cat.consumer.AnalyzerFactory</role>
<implementation>com.dianping.cat.consumer.DefaultAnalyzerFactory</implementation>
<configuration>
<local>true</local>
</configuration>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
......
......@@ -42,7 +42,7 @@
<version>1.0.13</version>
<executions>
<execution>
<id>default-cli</id>
<id>generate configuration model</id>
<phase>generate-sources</phase>
<goals>
<goal>dal-model</goal>
......@@ -52,6 +52,17 @@
</manifest>
</configuration>
</execution>
<execution>
<id>default-cli</id>
<phase>generate-sources</phase>
<goals>
<goal>dal-model</goal>
</goals>
<configuration>
<manifest>${basedir}/src/main/resources/META-INF/dal/model/status-manifest.xml
</manifest>
</configuration>
</execution>
<execution>
<id>generate plexus component descriptor</id>
<phase>process-classes</phase>
......
......@@ -24,13 +24,16 @@ import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.consumer.DummyConsumer;
import com.dianping.cat.message.spi.consumer.DumpToHtmlConsumer;
import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageHandler;
import com.dianping.cat.message.spi.internal.DefaultMessagePathBuilder;
import com.dianping.cat.message.spi.internal.DefaultMessageStatistics;
import com.dianping.cat.message.spi.internal.DefaultMessageStorage;
import com.dianping.cat.status.StatusUpdateTask;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -45,7 +48,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageReceiver.class, "in-memory", InMemoryReceiver.class) //
.req(InMemoryQueue.class));
all.add(C(MessageManager.class, DefaultMessageManager.class));
all.add(C(MessageManager.class, DefaultMessageManager.class) //
.req(MessageStatistics.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class) //
.req(MessageManager.class, MessageIdFactory.class));
all.add(C(MessageIdFactory.class));
......@@ -63,11 +67,12 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers"));
all.add(C(MessageQueue.class, DefaultMessageQueue.class) //
.config(E("size").value("100000")) //
.is(PER_LOOKUP));
.config(E("size").value("100000")) //
.is(PER_LOOKUP));
all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) //
.is(PER_LOOKUP) //
.req(MessageStatistics.class, "default", "m_statistics") //
.req(MessageCodec.class, "plain-text", "m_codec")//
.req(MessageQueue.class, "default", "m_queue"));
all.add(C(MessageReceiver.class, "tcp-socket", TcpSocketReceiver.class) //
......@@ -78,6 +83,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageHandler.class, DefaultMessageHandler.class) //
.req(MessageManager.class, MessageConsumerRegistry.class));
all.add(C(MessageStatistics.class, DefaultMessageStatistics.class));
all.add(C(StatusUpdateTask.class) //
.req(MessageStatistics.class));
all.addAll(new CodecComponentConfigurator().defineComponents());
all.addAll(new StorageComponentConfigurator().defineComponents());
......
......@@ -17,11 +17,19 @@ import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.status.StatusUpdateTask;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultMessageManager extends ContainerHolder implements MessageManager, LogEnabled {
@Inject
private MessageStatistics m_statistics;
private StatusUpdateTask m_statusUpdateTask;
private MessageIdFactory m_factory;
private TransportManager m_manager;
......@@ -76,6 +84,10 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
if (sender != null) {
sender.send(tree);
if (m_statistics != null) {
m_statistics.onSending(tree);
}
}
}
}
......@@ -153,14 +165,18 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
m_logger.warn("Unable to get local host!", e);
}
// initialize milli-second resolution level timer
MilliSecondTimer.initialize();
m_manager = lookup(TransportManager.class);
m_factory = lookup(MessageIdFactory.class);
m_statusUpdateTask = lookup(StatusUpdateTask.class);
// initialize domain and ip address
m_factory.initialize(m_domain.getId());
// initialize milli-second resolution level timer
MilliSecondTimer.initialize();
// start status update task
new Thread(m_statusUpdateTask).start();
}
@Override
......
......@@ -154,8 +154,6 @@ public class TcpSocketReceiver implements MessageReceiver, LogEnabled {
return null;
}
// TODO filter
return buffer.readBytes(length);
}
}
......@@ -168,7 +166,7 @@ public class TcpSocketReceiver implements MessageReceiver, LogEnabled {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
event.getCause().printStackTrace();
m_logger.warn(event.getChannel().toString(), event.getCause());
event.getChannel().close();
}
......
......@@ -22,21 +22,25 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class TcpSocketSender extends Thread implements MessageSender, LogEnabled {
@Inject
private String m_host;
private MessageCodec m_codec;
@Inject
private int m_port = 2280; // default port number from phone, C:2, A:2, T:8
private MessageQueue m_queue;
@Inject
private MessageCodec m_codec;
private MessageStatistics m_statistics;
@Inject
private String m_host;
@Inject
private MessageQueue m_queue;
private int m_port = 2280; // default port number from phone, C:2, A:2, T:8
private ChannelFactory m_factory;
......@@ -91,6 +95,7 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled
m_bootstrap = bootstrap;
this.setName("TcpSocketSender");
this.start();
}
......@@ -142,7 +147,10 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled
boolean result = m_queue.offer(tree);
if (!result) {
System.out.println("Message queue is full in tcp socket sender!");
if (m_statistics != null) {
m_statistics.onOverflowed(tree);
}
m_logger.error("Message queue is full in tcp socket sender!");
}
}
......@@ -156,7 +164,14 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(10 * 1024); // 10K
m_codec.encode(tree, buf);
int size = buf.readableBytes();
m_future.getChannel().write(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
}
......
package com.dianping.cat.message.spi;
public interface MessageFilter {
public String getConsumerId();
public boolean doFilter(byte[] data);
}
package com.dianping.cat.message.spi;
public interface MessageStatistics {
public long getProduced();
public long getOverflowed();
public long getBytes();
public void onSending(MessageTree tree);
public void onOverflowed(MessageTree tree);
public void onBytes(int size);
}
package com.dianping.cat.message.spi.internal;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageStatistics implements MessageStatistics {
private long m_produced;
private long m_overflowed;
private long m_bytes;
@Override
public void onSending(MessageTree tree) {
m_produced++;
}
@Override
public void onOverflowed(MessageTree tree) {
m_overflowed++;
}
@Override
public void onBytes(int bytes) {
m_bytes += bytes;
}
@Override
public long getProduced() {
return m_produced;
}
@Override
public long getOverflowed() {
return m_overflowed;
}
@Override
public long getBytes() {
return m_bytes;
}
}
package com.dianping.cat.status;
import java.io.File;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.util.List;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.status.model.entity.DiskSpaceInfo;
import com.dianping.cat.status.model.entity.GcInfo;
import com.dianping.cat.status.model.entity.MemoryInfo;
import com.dianping.cat.status.model.entity.MessageInfo;
import com.dianping.cat.status.model.entity.OsInfo;
import com.dianping.cat.status.model.entity.RuntimeInfo;
import com.dianping.cat.status.model.entity.StatusInfo;
import com.dianping.cat.status.model.entity.ThreadInfo;
import com.dianping.cat.status.model.transform.BaseVisitor;
class StatusInfoCollector extends BaseVisitor {
private MessageStatistics m_statistics;
public StatusInfoCollector(MessageStatistics statistics) {
m_statistics = statistics;
}
long getGcCount(List<GarbageCollectorMXBean> mxbeans) {
long count = 0;
for (GarbageCollectorMXBean mxbean : mxbeans) {
if (mxbean.isValid()) {
count += mxbean.getCollectionCount();
}
}
return count;
}
long getGcTime(List<GarbageCollectorMXBean> mxbeans) {
long time = 0;
for (GarbageCollectorMXBean mxbean : mxbeans) {
if (mxbean.isValid()) {
time += mxbean.getCollectionTime();
}
}
return time;
}
boolean isInstanceOfInterface(Class<?> clazz, String interfaceName) {
if (clazz == Object.class) {
return false;
} else if (clazz.getName().equals(interfaceName)) {
return true;
}
Class<?>[] interfaceclasses = clazz.getInterfaces();
for (Class<?> interfaceClass : interfaceclasses) {
if (isInstanceOfInterface(interfaceClass, interfaceName)) {
return true;
}
}
return isInstanceOfInterface(clazz.getSuperclass(), interfaceName);
}
@Override
public void visitDiskSpace(DiskSpaceInfo diskSpace) {
File workingDir = new File(".");
diskSpace.setTotal(workingDir.getTotalSpace());
diskSpace.setFree(workingDir.getFreeSpace());
diskSpace.setUsable(workingDir.getUsableSpace());
}
@Override
public void visitGc(GcInfo gc) {
List<GarbageCollectorMXBean> beans = ManagementFactory.getGarbageCollectorMXBeans();
gc.setCount(getGcCount(beans));
gc.setTime(getGcTime(beans));
}
@Override
public void visitMemory(MemoryInfo memory) {
MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
Runtime runtime = Runtime.getRuntime();
memory.setTotal(runtime.totalMemory());
memory.setFree(runtime.freeMemory());
memory.setHeapUsage(bean.getHeapMemoryUsage().getUsed());
memory.setNonHeapUsage(bean.getNonHeapMemoryUsage().getUsed());
memory.setGc(new GcInfo());
super.visitMemory(memory);
}
@Override
public void visitMessage(MessageInfo message) {
if (m_statistics != null) {
message.setProduced(m_statistics.getProduced());
message.setOverflowed(m_statistics.getOverflowed());
message.setBytes(m_statistics.getBytes());
}
}
@Override
public void visitOs(OsInfo os) {
OperatingSystemMXBean bean = ManagementFactory.getOperatingSystemMXBean();
os.setArch(bean.getArch());
os.setName(bean.getName());
os.setVersion(bean.getName());
os.setAvailableProcessors(bean.getAvailableProcessors());
os.setSystemLoadAverage(bean.getSystemLoadAverage());
// for Sun JDK
if (isInstanceOfInterface(bean.getClass(), "com.sun.management.OperatingSystemMXBean")) {
com.sun.management.OperatingSystemMXBean b = (com.sun.management.OperatingSystemMXBean) bean;
os.setTotalPhysicalMemory(b.getTotalPhysicalMemorySize());
os.setFreePhysicalMemory(b.getFreePhysicalMemorySize());
os.setTotalSwapSpace(b.getTotalSwapSpaceSize());
os.setFreeSwapSpace(b.getFreeSwapSpaceSize());
os.setProcessTime(b.getProcessCpuTime());
os.setCommittedVirtualMemory(b.getCommittedVirtualMemorySize());
}
}
@Override
public void visitRuntime(RuntimeInfo runtime) {
RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
runtime.setStartTime(bean.getStartTime());
runtime.setUpTime(bean.getUptime());
}
@Override
public void visitStatus(StatusInfo status) {
status.setOs(new OsInfo());
status.setDiskSpace(new DiskSpaceInfo());
status.setRuntime(new RuntimeInfo());
status.setMemory(new MemoryInfo());
status.setThread(new ThreadInfo());
status.setMessage(new MessageInfo());
super.visitStatus(status);
}
@Override
public void visitThread(ThreadInfo thread) {
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
thread.setCount(bean.getThreadCount());
thread.setDaemonCount(bean.getDaemonThreadCount());
thread.setPeekCount(bean.getPeakThreadCount());
thread.setTotalStartedCount(bean.getTotalStartedThreadCount());
}
}
\ No newline at end of file
package com.dianping.cat.status;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.internal.MilliSecondTimer;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.status.model.entity.StatusInfo;
import com.site.lookup.annotation.Inject;
public class StatusUpdateTask implements Runnable, Initializable {
@Inject
private MessageStatistics m_statistics;
private boolean m_active = true;
private String m_ipAddress;
private long m_interval = 1000; // 1 ms
@Override
public void initialize() throws InitializationException {
try {
m_ipAddress = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
// ignore it
}
}
@Override
public void run() {
while (m_active) {
long start = MilliSecondTimer.currentTimeMillis();
Heartbeat heartbeat = Cat.getProducer().newHeartbeat("Heartbeat", m_ipAddress);
StatusInfo status = new StatusInfo();
status.accept(new StatusInfoCollector(m_statistics));
heartbeat.addData(status.toString());
heartbeat.setStatus(Message.SUCCESS);
heartbeat.complete();
long elapsed = MilliSecondTimer.currentTimeMillis() - start;
if (elapsed < m_interval) {
try {
Thread.sleep(m_interval - elapsed);
} catch (InterruptedException e) {
break;
}
}
}
}
public void setInterval(long interval) {
m_interval = interval;
}
public void shutdown() {
m_active = false;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<model>
<entity name="status" root="true">
<entity-ref name="runtime" />
<entity-ref name="os" />
<entity-ref name="disk-space" />
<entity-ref name="memory" />
<entity-ref name="thread" />
<entity-ref name="message" />
</entity>
<entity name="runtime">
<attribute name="start-time" value-type="int" />
<attribute name="up-time" value-type="int" />
</entity>
<entity name="os">
<attribute name="name" value-type="String" />
<attribute name="arch" value-type="String" />
<attribute name="version" value-type="String" />
<attribute name="available-processors" value-type="int" />
<attribute name="system-load-average" value-type="double" />
<attribute name="process-time" value-type="int" />
<attribute name="total-physical-memory" value-type="int" />
<attribute name="free-physical-memory" value-type="int" />
<attribute name="committed-virtual-memory" value-type="int" />
<attribute name="total-swap-space" value-type="int" />
<attribute name="free-swap-space" value-type="int" />
</entity>
<entity name="disk-space">
<attribute name="total" value-type="int" />
<attribute name="free" value-type="int" />
<attribute name="usable" value-type="int" />
</entity>
<entity name="memory">
<attribute name="total" value-type="int" />
<attribute name="free" value-type="int" />
<attribute name="heap-usage" value-type="int" />
<attribute name="non-heap-usage" value-type="int" />
<entity-ref name="gc" />
</entity>
<entity name="gc">
<attribute name="count" value-type="int" />
<attribute name="time" value-type="int" />
</entity>
<entity name="thread">
<attribute name="count" value-type="int" />
<attribute name="daemon-count" value-type="int" />
<attribute name="peek-count" value-type="int" />
<attribute name="total-started-count" value-type="int" />
</entity>
<entity name="message">
<attribute name="produced" value-type="int" />
<attribute name="overflowed" value-type="int" />
<attribute name="bytes" value-type="int" />
</entity>
</model>
<?xml version="1.0" encoding="UTF-8"?>
<manifest>
<file path="status-codegen.xml" />
<file path="status-model.xml" />
</manifest>
<?xml version="1.0" encoding="UTF-8"?>
<model model-package="com.dianping.cat.status.model" enable-xml-parser="true" enable-xml-schema="true"
enable-merger="true" enable-model-test="true" enable-base-visitor="true">
<entity name="status" class-name="StatusInfo" dynamic-attributes="true" />
<entity name="runtime" class-name="RuntimeInfo">
<attribute name="start-time" value-type="long" primitive="true" />
<attribute name="up-time" value-type="long" primitive="true" />
</entity>
<entity name="os" class-name="OsInfo">
<attribute name="available-processors" value-type="int" primitive="true" />
<attribute name="system-load-average" value-type="double" primitive="true" />
<attribute name="process-time" value-type="long" primitive="true" />
<attribute name="total-physical-memory" value-type="long" primitive="true" />
<attribute name="free-physical-memory" value-type="long" primitive="true" />
<attribute name="committed-virtual-memory" value-type="long" primitive="true" />
<attribute name="total-swap-space" value-type="long" primitive="true" />
<attribute name="free-swap-space" value-type="long" primitive="true" />
</entity>
<entity name="disk-space" class-name="DiskSpaceInfo">
<attribute name="total" value-type="long" primitive="true" />
<attribute name="free" value-type="long" primitive="true" />
<attribute name="usable" value-type="long" primitive="true" />
</entity>
<entity name="memory" class-name="MemoryInfo">
<attribute name="total" value-type="long" primitive="true" />
<attribute name="free" value-type="long" primitive="true" />
<attribute name="heap-usage" value-type="long" primitive="true" />
<attribute name="non-heap-usage" value-type="long" primitive="true" />
<entity-ref name="gc" />
</entity>
<entity name="gc" class-name="GcInfo">
<attribute name="count" value-type="long" primitive="true" />
<attribute name="time" value-type="long" primitive="true" />
</entity>
<entity name="thread" class-name="ThreadInfo">
<attribute name="count" value-type="int" primitive="true" />
<attribute name="daemon-count" value-type="int" primitive="true" />
<attribute name="peek-count" value-type="int" primitive="true" />
<attribute name="total-started-count" value-type="long" primitive="true" />
</entity>
<entity name="message" class-name="MessageInfo">
<attribute name="produced" value-type="long" primitive="true" />
<attribute name="overflowed" value-type="long" primitive="true" />
<attribute name="bytes" value-type="long" primitive="true" />
</entity>
</model>
\ No newline at end of file
......@@ -27,6 +27,11 @@
<component>
<role>com.dianping.cat.message.spi.MessageManager</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStatistics</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.MessageProducer</role>
......@@ -113,6 +118,10 @@
<implementation>com.dianping.cat.message.io.TcpSocketSender</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStatistics</role>
<field-name>m_statistics</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
......@@ -157,6 +166,19 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStatistics</role>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageStatistics</implementation>
</component>
<component>
<role>com.dianping.cat.status.StatusUpdateTask</role>
<implementation>com.dianping.cat.status.StatusUpdateTask</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStatistics</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.codec.BufferWriter</role>
<role-hint>escape</role-hint>
......
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified" attributeFormDefault="unqualified">
<xs:element name="status" type="StatusType"/>
<xs:complexType name="StatusType">
<xs:sequence>
<xs:element name="runtime" type="RuntimeType" minOccurs="0" maxOccurs="1"/>
<xs:element name="os" type="OsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="cpu" type="CpuType" minOccurs="0" maxOccurs="1"/>
<xs:element name="memory" type="MemoryType" minOccurs="0" maxOccurs="1"/>
<xs:element name="thread" type="ThreadType" minOccurs="0" maxOccurs="1"/>
<xs:element name="message" type="MessageType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="RuntimeType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="start-time" type="xs:int"/>
<xs:attribute name="up-time" type="xs:int"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<xs:complexType name="OsType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="name" type="xs:string"/>
<xs:attribute name="arch" type="xs:string"/>
<xs:attribute name="version" type="xs:string"/>
<xs:attribute name="available-processors" type="xs:int"/>
<xs:attribute name="system-load-average" type="xs:double"/>
<xs:attribute name="total-physical-memory" type="xs:int"/>
<xs:attribute name="free-physical-memory" type="xs:int"/>
<xs:attribute name="committed-virtual-memory" type="xs:int"/>
<xs:attribute name="total-swap-space" type="xs:int"/>
<xs:attribute name="free-swap-space" type="xs:int"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<xs:complexType name="CpuType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="process-time" type="xs:int"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<xs:complexType name="MemoryType">
<xs:sequence>
<xs:element name="gc" type="GcType" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute name="total" type="xs:int"/>
<xs:attribute name="free" type="xs:int"/>
<xs:attribute name="used" type="xs:int"/>
</xs:complexType>
<xs:complexType name="GcType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="count" type="xs:int"/>
<xs:attribute name="time" type="xs:int"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<xs:complexType name="ThreadType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="count" type="xs:int"/>
<xs:attribute name="daemon-count" type="xs:int"/>
<xs:attribute name="peek-count" type="xs:int"/>
<xs:attribute name="total-started-count" type="xs:int"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<xs:complexType name="MessageType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="constructed" type="xs:int"/>
<xs:attribute name="overflowed" type="xs:int"/>
<xs:attribute name="bytes" type="xs:int"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
</xs:schema>
package com.dianping.cat.status;
import org.junit.Test;
import com.dianping.cat.status.model.entity.StatusInfo;
public class StatusInfoCollectorTest {
@Test
public void test() {
StatusInfo status = new StatusInfo();
status.accept(new StatusInfoCollector(null));
System.out.println(status);
}
}
package com.dianping.cat.status.model;
import org.junit.Assert;
import org.junit.Test;
import com.dianping.cat.status.model.entity.StatusInfo;
import com.dianping.cat.status.model.transform.DefaultXmlBuilder;
import com.dianping.cat.status.model.transform.DefaultXmlParser;
import com.site.helper.Files;
public class StatusInfoTest {
@Test
public void testXml() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("status.xml"), "utf-8");
StatusInfo root = parser.parse(source);
String xml = new DefaultXmlBuilder().buildXml(root);
String expected = source;
Assert.assertEquals("XML is not well parsed!", expected.replace("\r", ""), xml.replace("\r", ""));
}
}
<status xmlns:xsi="http://www.w3.org/2001/XMLSchema" xsi:noNamespaceSchemaLocation="status.xsd">
<runtime start-time="1332558734422" up-time="373"/>
<os name="Mac OS X" arch="x86_64" version="Mac OS X" available-processors="4" system-load-average="0.9189453125" process-time="730000000" total-physical-memory="4294967296" free-physical-memory="55799808" committed-virtual-memory="2921177088" total-swap-space="7516192768" free-swap-space="337608704"/>
<memory total="85000192" free="80400592" heap-usage="4599600" non-heap-usage="6804336">
<gc count="0" time="0"/>
</memory>
<thread count="5" daemon-count="3" peek-count="5" total-started-count="5"/>
<message produced="123" overflowed="2" bytes="12345"/>
</status>
......@@ -4,7 +4,16 @@ import java.io.IOException;
import javax.servlet.ServletException;
import com.dianping.cat.consumer.event.model.entity.EventName;
import com.dianping.cat.consumer.event.model.entity.EventType;
import com.dianping.cat.consumer.transaction.model.IEntity;
import com.dianping.cat.consumer.transaction.model.entity.Duration;
import com.dianping.cat.consumer.transaction.model.entity.Range;
import com.dianping.cat.consumer.transaction.model.entity.TransactionName;
import com.dianping.cat.consumer.transaction.model.entity.TransactionType;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder;
import com.dianping.cat.report.ReportPage;
import com.dianping.cat.report.page.model.event.LocalEventService;
import com.dianping.cat.report.page.model.logview.LocalLogViewService;
import com.dianping.cat.report.page.model.problem.LocalProblemService;
import com.dianping.cat.report.page.model.spi.ModelRequest;
......@@ -24,13 +33,36 @@ public class Handler extends ContainerHolder implements PageHandler<Context> {
@Inject(type = ModelService.class, value = "transaction-local")
private LocalTransactionService m_transactionService;
@Inject(type = ModelService.class, value = "event-local")
private LocalEventService m_eventService;
@Inject(type = ModelService.class, value = "problem-local")
private LocalProblemService m_problemService;
@Inject(type = ModelService.class, value = "logview-local")
private LocalLogViewService m_logviewService;
private String doFilter(Payload payload, Object dataModel) {
String report = payload.getReport();
if ("transaction".equals(report)) {
TransactionReportFilter filter = new TransactionReportFilter(payload.getType(), payload.getName());
return filter.buildXml((IEntity<?>) dataModel);
} else if ("event".equals(report)) {
EventReportFilter filter = new EventReportFilter(payload.getType(), payload.getName());
return filter.buildXml((com.dianping.cat.consumer.event.model.IEntity<?>) dataModel);
} else if ("problem".equals(report)) {
ProblemReportFilter filter = new ProblemReportFilter();
return filter.buildXml((com.dianping.cat.consumer.problem.model.IEntity<?>) dataModel);
} else {
return String.valueOf(dataModel);
}
}
@Override
@PayloadMeta(Payload.class)
@InboundActionMeta(name = "model")
......@@ -55,6 +87,8 @@ public class Handler extends ContainerHolder implements PageHandler<Context> {
if ("transaction".equals(report)) {
response = m_transactionService.invoke(request);
} else if ("event".equals(report)) {
response = m_eventService.invoke(request);
} else if ("problem".equals(report)) {
response = m_problemService.invoke(request);
} else if ("logview".equals(report)) {
......@@ -66,11 +100,108 @@ public class Handler extends ContainerHolder implements PageHandler<Context> {
Object dataModel = response.getModel();
model.setModel(dataModel);
model.setModelInXml(dataModel == null ? "" : String.valueOf(dataModel));
model.setModelInXml(dataModel == null ? "" : doFilter(payload, dataModel));
} catch (Throwable e) {
model.setException(e);
}
m_jspViewer.view(ctx, model);
}
static class EventReportFilter extends com.dianping.cat.consumer.event.model.transform.DefaultXmlBuilder {
private String m_type;
private String m_name;
public EventReportFilter(String type, String name) {
m_type = type;
m_name = name;
}
@Override
public void visitName(EventName name) {
if (m_type == null) {
// skip it
} else if (m_name != null && name.getId().equals(m_name)) {
super.visitName(name);
} else {
super.visitName(name);
}
}
@Override
public void visitRange(com.dianping.cat.consumer.event.model.entity.Range range) {
if (m_type != null && m_name != null) {
super.visitRange(range);
}
}
@Override
public void visitType(EventType type) {
if (m_type == null) {
super.visitType(type);
} else if (m_type != null && type.getId().equals(m_type)) {
type.setSuccessMessageUrl(null);
type.setFailMessageUrl(null);
super.visitType(type);
} else {
// skip it
}
}
}
static class ProblemReportFilter extends com.dianping.cat.consumer.problem.model.transform.DefaultXmlBuilder {
// TODO
}
static class TransactionReportFilter extends DefaultXmlBuilder {
private String m_type;
private String m_name;
public TransactionReportFilter(String type, String name) {
m_type = type;
m_name = name;
}
@Override
public void visitDuration(Duration duration) {
if (m_type != null && m_name != null) {
super.visitDuration(duration);
}
}
@Override
public void visitName(TransactionName name) {
if (m_type == null) {
// skip it
} else if (m_name != null && name.getId().equals(m_name)) {
super.visitName(name);
} else {
super.visitName(name);
}
}
@Override
public void visitRange(Range range) {
if (m_type != null && m_name != null) {
super.visitRange(range);
}
}
@Override
public void visitType(TransactionType type) {
if (m_type == null) {
super.visitType(type);
} else if (m_type != null && type.getId().equals(m_type)) {
type.setSuccessMessageUrl(null);
type.setFailMessageUrl(null);
super.visitType(type);
} else {
// skip it
}
}
}
}
......@@ -17,8 +17,10 @@ public class Payload implements ActionPayload<ReportPage, Action> {
@PathMeta("path")
private String[] m_path;
@FieldMeta("ip")
private String m_ip;
@FieldMeta("type")
private String m_type;
@FieldMeta("name")
private String m_name;
@Override
public Action getAction() {
......@@ -33,10 +35,6 @@ public class Payload implements ActionPayload<ReportPage, Action> {
}
}
public String getIp() {
return m_ip;
}
@Override
public ReportPage getPage() {
return m_page;
......@@ -58,12 +56,8 @@ public class Payload implements ActionPayload<ReportPage, Action> {
}
}
public void setAction(Action action) {
m_action = action;
}
public void setIp(String ip) {
m_ip = ip;
public void setAction(String action) {
m_action = Action.getByName(action, Action.XML);
}
@Override
......@@ -75,7 +69,26 @@ public class Payload implements ActionPayload<ReportPage, Action> {
m_path = path;
}
public String getType() {
return m_type;
}
public void setType(String type) {
m_type = type;
}
public String getName() {
return m_name;
}
public void setName(String name) {
m_name = name;
}
@Override
public void validate(ActionContext<?> ctx) {
if (m_action == null) {
m_action = Action.XML;
}
}
}
......@@ -53,7 +53,7 @@ public class Handler implements PageHandler<Context>, Initializable {
String date = String.valueOf(payload.getDate());
ModelRequest request = new ModelRequest(domain, payload.getPeriod()) //
.setProperty("date", date) //
.setProperty("type", payload.getType())//
.setProperty("type", payload.getType()) //
.setProperty("name", payload.getName());
ModelResponse<TransactionReport> response = m_service.invoke(request);
TransactionReport report = response.getModel();
......@@ -67,11 +67,9 @@ public class Handler implements PageHandler<Context>, Initializable {
}
return n;
} else {
return null;
}
Cat.getManager().getThreadLocalMessageTree();
return null;
}
private TransactionReport getReport(Payload payload) {
......
package com.dianping.cat.report.page.model;
import junit.framework.Assert;
import org.junit.Test;
import com.dianping.cat.consumer.event.model.entity.EventReport;
import com.dianping.cat.consumer.event.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.model.Handler.EventReportFilter;
import com.site.helper.Files;
public class EventReportFilterTest {
@Test
public void test() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("event.xml"), "utf-8");
EventReport report = parser.parse(source);
EventReportFilter f1 = new EventReportFilter(null, null);
String expected1 = Files.forIO().readFrom(getClass().getResourceAsStream("event-type.xml"), "utf-8");
Assert.assertEquals(expected1.replaceAll("\r", ""), f1.buildXml(report).replaceAll("\r", ""));
EventReportFilter f2 = new EventReportFilter("URL", null);
String expected2 = Files.forIO().readFrom(getClass().getResourceAsStream("event-name.xml"), "utf-8");
Assert.assertEquals(expected2.replaceAll("\r", ""), f2.buildXml(report).replaceAll("\r", ""));
}
}
package com.dianping.cat.report.page.model;
import junit.framework.Assert;
import org.junit.Test;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.report.page.model.Handler.TransactionReportFilter;
import com.site.helper.Files;
public class TransactionReportFilterTest {
@Test
public void test() throws Exception {
DefaultXmlParser parser = new DefaultXmlParser();
String source = Files.forIO().readFrom(getClass().getResourceAsStream("transaction.xml"), "utf-8");
TransactionReport report = parser.parse(source);
TransactionReportFilter f1 = new TransactionReportFilter(null, null);
String expected1 = Files.forIO().readFrom(getClass().getResourceAsStream("transaction-type.xml"), "utf-8");
Assert.assertEquals(expected1.replaceAll("\r", ""), f1.buildXml(report).replaceAll("\r", ""));
TransactionReportFilter f2 = new TransactionReportFilter("URL", null);
String expected2 = Files.forIO().readFrom(getClass().getResourceAsStream("transaction-name.xml"), "utf-8");
Assert.assertEquals(expected2.replaceAll("\r", ""), f2.buildXml(report).replaceAll("\r", ""));
}
}
<event-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00">
<name id="home" totalCount="1" failCount="0" failPercent="0.00">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="service" totalCount="8" failCount="0" failPercent="0.00">
<successMessageUrl>20120216/23/Cat/b10bdefb-1eca-45e1-a9c3-52367078b5a2.html</successMessageUrl>
</name>
<name id="t" totalCount="1" failCount="0" failPercent="0.00">
<successMessageUrl>20120216/23/Cat/8e7c91a0-7549-4b13-b43b-252b2a6ef4bb.html</successMessageUrl>
</name>
<name id="ip" totalCount="1" failCount="0" failPercent="0.00">
<successMessageUrl>20120216/23/Cat/2029c32e-b692-4e43-8eaf-96b8d6c846a2.html</successMessageUrl>
</name>
</type>
</event-report>
<event-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="MVC" totalCount="33" failCount="0" failPercent="0.00">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="NEW1" totalCount="33" failCount="0" failPercent="0.00">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
</event-report>
<event-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="194.0" avg="47.1" sum="518.0" sum2="73942.0" std="67.1">
<name id="home" totalCount="1" failCount="0" failPercent="0.00" min="175.0" max="175.0" avg="175.0" sum="175.0" sum2="30625.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
<range minute="5" count="123" events="3"/>
<range minute="10" count="123" events="3"/>
</name>
<name id="service" totalCount="8" failCount="0" failPercent="0.00" min="1.0" max="58.0" avg="13.0" sum="104.0" sum2="3952.0" std="18.0">
<successMessageUrl>20120216/23/Cat/b10bdefb-1eca-45e1-a9c3-52367078b5a2.html</successMessageUrl>
</name>
<name id="t" totalCount="1" failCount="0" failPercent="0.00" min="193.0" max="193.0" avg="193.0" sum="193.0" sum2="37249.0" std="0.0">
<successMessageUrl>20120216/23/Cat/8e7c91a0-7549-4b13-b43b-252b2a6ef4bb.html</successMessageUrl>
</name>
<name id="ip" totalCount="1" failCount="0" failPercent="0.00" min="46.0" max="46.0" avg="46.0" sum="46.0" sum2="2116.0" std="0.0">
<successMessageUrl>20120216/23/Cat/2029c32e-b692-4e43-8eaf-96b8d6c846a2.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="MVC" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<name id="InboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="17.0" avg="1.5" sum="17.0" sum2="289.0" std="4.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="TransitionPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="OutboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="482.0" sum2="68088.0" std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="NEW1" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<name id="InboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="17.0" avg="1.5" sum="17.0" sum2="289.0" std="4.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="TransitionPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="OutboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="482.0" sum2="68088.0" std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
</event-report>
<transaction-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="194.0" avg="47.1" sum="518.0" sum2="73942.0" std="67.1">
<name id="home" totalCount="1" failCount="0" failPercent="0.00" min="175.0" max="175.0" avg="175.0" sum="175.0" sum2="30625.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="service" totalCount="8" failCount="0" failPercent="0.00" min="1.0" max="58.0" avg="13.0" sum="104.0" sum2="3952.0" std="18.0">
<successMessageUrl>20120216/23/Cat/b10bdefb-1eca-45e1-a9c3-52367078b5a2.html</successMessageUrl>
</name>
<name id="t" totalCount="1" failCount="0" failPercent="0.00" min="193.0" max="193.0" avg="193.0" sum="193.0" sum2="37249.0" std="0.0">
<successMessageUrl>20120216/23/Cat/8e7c91a0-7549-4b13-b43b-252b2a6ef4bb.html</successMessageUrl>
</name>
<name id="ip" totalCount="1" failCount="0" failPercent="0.00" min="46.0" max="46.0" avg="46.0" sum="46.0" sum2="2116.0" std="0.0">
<successMessageUrl>20120216/23/Cat/2029c32e-b692-4e43-8eaf-96b8d6c846a2.html</successMessageUrl>
</name>
</type>
</transaction-report>
<transaction-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="194.0" avg="47.1" sum="518.0" sum2="73942.0" std="67.1">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="MVC" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="NEW1" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
</transaction-report>
<transaction-report domain="Cat" startTime="2012-02-16 23:00:00" endTime="2012-02-16 23:59:00">
<domain>Cat</domain>
<type id="URL" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="194.0" avg="47.1" sum="518.0" sum2="73942.0" std="67.1">
<name id="home" totalCount="1" failCount="0" failPercent="0.00" min="175.0" max="175.0" avg="175.0" sum="175.0" sum2="30625.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
<range minute="5" count="123" sum="123456" avg="22.2" transactions="3"/>
<range minute="10" count="123" sum="12457" avg="222" transactions="3"/>
<duration value="128" count="34"/>
<duration value="256" count="12"/>
</name>
<name id="service" totalCount="8" failCount="0" failPercent="0.00" min="1.0" max="58.0" avg="13.0" sum="104.0" sum2="3952.0" std="18.0">
<successMessageUrl>20120216/23/Cat/b10bdefb-1eca-45e1-a9c3-52367078b5a2.html</successMessageUrl>
</name>
<name id="t" totalCount="1" failCount="0" failPercent="0.00" min="193.0" max="193.0" avg="193.0" sum="193.0" sum2="37249.0" std="0.0">
<successMessageUrl>20120216/23/Cat/8e7c91a0-7549-4b13-b43b-252b2a6ef4bb.html</successMessageUrl>
</name>
<name id="ip" totalCount="1" failCount="0" failPercent="0.00" min="46.0" max="46.0" avg="46.0" sum="46.0" sum2="2116.0" std="0.0">
<successMessageUrl>20120216/23/Cat/2029c32e-b692-4e43-8eaf-96b8d6c846a2.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="MVC" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<name id="InboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="17.0" avg="1.5" sum="17.0" sum2="289.0" std="4.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="TransitionPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="OutboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="482.0" sum2="68088.0" std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
<type id="NEW1" totalCount="33" failCount="0" failPercent="0.00" min="0.0" max="191.0" avg="15.1" sum="499.0" sum2="68377.0" std="42.9">
<name id="InboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="17.0" avg="1.5" sum="17.0" sum2="289.0" std="4.9">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="TransitionPhase" totalCount="11" failCount="0" failPercent="0.00" min="0.0" max="4.9E-324" avg="0.0" sum="0.0" sum2="0.0" std="0.0">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<name id="OutboundPhase" totalCount="11" failCount="0" failPercent="0.00" min="1.0" max="191.0" avg="43.8" sum="482.0" sum2="68088.0" std="65.3">
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</name>
<successMessageUrl>20120216/23/Cat/1168a02c-664b-440c-9ef4-a87bac4d9cb1.html</successMessageUrl>
</type>
</transaction-report>
......@@ -54,7 +54,7 @@
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.12</version>
<version>1.0.14</version>
<executions>
<execution>
<id>generate dal jdbc model</id>
......
......@@ -30,7 +30,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
String serverUri = property("serve-uri", "hdfs://192.168.7.43:9000/user/cat");
String serverUri = property("server-uri", "hdfs://192.168.7.43:9000/user/cat");
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
......
......@@ -75,6 +75,8 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
}
public void setServerUri(String serverUri) {
m_serverUri = URI.create(serverUri);
if (serverUri != null && serverUri.length() > 0) {
m_serverUri = URI.create(serverUri);
}
}
}
......@@ -22,7 +22,8 @@ import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, LogEnabled {
public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable,
LogEnabled {
@Inject
private MessagePathBuilder m_builder;
......@@ -143,6 +144,8 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp
}
public void setServerUri(String serverUri) {
m_serverUri = URI.create(serverUri);
if (serverUri != null && serverUri.length() > 0) {
m_serverUri = URI.create(serverUri);
}
}
}
......@@ -19,7 +19,7 @@
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation>
<configuration>
<baseDir>data</baseDir>
<serverUri>hdfs://192.168.7.43:9000/user/cat</serverUri>
<serverUri></serverUri>
</configuration>
<requirements>
<requirement>
......@@ -43,7 +43,7 @@
<implementation>com.dianping.cat.job.hdfs.DefaultInputChannelManager</implementation>
<configuration>
<baseDir>data</baseDir>
<serverUri>hdfs://192.168.7.43:9000/user/cat</serverUri>
<serverUri></serverUri>
</configuration>
</component>
<component>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册