提交 e680310d 编写于 作者: Y yong.you

Merge branch 'biz' of github.com:dianping/cat into biz

......@@ -3,7 +3,7 @@
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.dianping.cat</groupId>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-client</artifactId>
......
......@@ -14,6 +14,7 @@ import org.unidal.initialization.Module;
import org.unidal.initialization.ModuleContext;
import com.dianping.cat.abtest.ABTestManager;
import com.dianping.cat.agent.MmapConsumerTask;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.configuration.ClientConfigReloader;
import com.dianping.cat.configuration.client.entity.ClientConfig;
......@@ -48,8 +49,8 @@ public class CatClientModule extends AbstractModule {
// bring up TransportManager
ctx.lookup(TransportManager.class);
// start status update task
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
Threads.forGroup("Cat").start(statusUpdateTask);
......@@ -60,9 +61,12 @@ public class CatClientModule extends AbstractModule {
if (config != null) {
Threads.forGroup("Cat").start(new ClientConfigReloader(clientConfigFile.getAbsolutePath(), config));
}
}
ABTestManager.initialize();
MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
Threads.forGroup("Cat").start(mmapReaderTask);
ABTestManager.initialize();
}
}
@Override
......
package com.dianping.cat.agent;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.internal.DefaultEvent;
import com.dianping.cat.message.internal.DefaultMessageManager;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import com.site.helper.Splitters;
public class MmapConsumerTask implements Task, Initializable, LogEnabled {
@Inject
private ClientConfigManager m_configManager;
@Inject
private DefaultMessageManager m_messageManager;
private QueueDescriptor m_descriptor;
private QueueReader m_reader;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public String getName() {
return getClass().getSimpleName();
}
public void initialize() {
String mmapName = m_configManager.getMmapName();
m_descriptor = new QueueDescriptor(new File(mmapName + ".idx"));
m_reader = new QueueReader(new File(mmapName + ".dat"));
}
@Override
public void run() {
Cat.setup(null);
try {
m_descriptor.ensureOpen(0, 24);
m_reader.ensureOpen(m_descriptor.getReaderIndex(), m_descriptor.getQueueSize());
while (true) {
MessageTree tree = m_reader.next();
if (tree == null) {
break;
}
m_messageManager.flush(tree);
}
} catch (InterruptedException e) {
// ignore it
} finally {
Cat.reset();
}
}
@Override
public void shutdown() {
}
class QueueDescriptor {
private File m_file;
private MappedByteBuffer m_buffer;
public QueueDescriptor(File file) {
m_file = file;
}
public void ensureOpen(long position, long size) throws InterruptedException {
boolean first = true;
while (true) {
if (m_file.canRead()) {
if (m_buffer == null && position < size) {
RandomAccessFile raf = null;
try {
if (first) {
m_logger.info(String.format("Opening mmap index file %s ...", m_file.getCanonicalPath()));
}
raf = new RandomAccessFile(m_file, "rw");
m_buffer = raf.getChannel().map(MapMode.READ_WRITE, 0, size);
m_buffer.load();
m_buffer.position((int) position);
m_buffer.order(ByteOrder.LITTLE_ENDIAN);
break;
} catch (IOException e) {
if (first) {
e.printStackTrace();
first = false;
}
} finally {
try {
raf.close(); // we don't need it any more
} catch (IOException e) {
// ignore it
}
}
}
}
TimeUnit.MILLISECONDS.sleep(100); // sleep 100ms
}
}
public long getQueueSize() {
return m_buffer.getLong(0);
}
public long getReaderIndex() {
return m_buffer.getLong(16);
}
public long getWriterIndex() {
return m_buffer.getLong(8);
}
public void setReaderIndex(long newIndex) {
m_buffer.putLong(16, newIndex);
}
@Override
public String toString() {
return String.format("%s[size=%s, writerIndex=%s, readerIndex=%s, file=%s]", getClass().getSimpleName(), getQueueSize(),
getWriterIndex(), getReaderIndex(), m_file);
}
}
class QueueReader {
private File m_file;
private MappedByteBuffer m_buffer;
public QueueReader(File file) {
m_file = file;
}
public void ensureOpen(long position, long size) throws InterruptedException {
boolean first = true;
while (true) {
if (m_file.canRead()) {
if (m_buffer == null) {
RandomAccessFile raf = null;
try {
if (first) {
m_logger.info(String.format("Opening mmap data file %s ...", m_file.getCanonicalPath()));
}
raf = new RandomAccessFile(m_file, "r");
m_buffer = raf.getChannel().map(MapMode.READ_ONLY, 0, size);
m_buffer.load();
m_buffer.position((int) position);
m_buffer.order(ByteOrder.LITTLE_ENDIAN);
break;
} catch (IOException e) {
if (first) {
e.printStackTrace();
first = false;
}
} finally {
try {
raf.close(); // we don't need it any more
} catch (IOException e) {
// ignore it
}
}
}
}
TimeUnit.MILLISECONDS.sleep(100); // sleep 100ms
}
}
private String getDomainByMessageId(String id, String defaultValue) {
try {
return MessageId.parse(id).getDomain();
} catch (RuntimeException e) {
return defaultValue;
}
}
private DefaultEvent newEvent(String type, String name, String data) {
DefaultEvent event = new DefaultEvent(type, name);
if (data != null) {
event.addData(data);
}
event.setStatus(Message.SUCCESS);
event.complete();
return event;
}
public MessageTree next() throws InterruptedException {
MessageTree tree = null;
List<String> list = new ArrayList<String>(16);
StringBuilder sb = new StringBuilder(1024);
String childId = null;
int step = 0;
while (true) {
sb.setLength(0);
readLine(sb);
if (sb.length() == 0) {
break;
}
String line = sb.toString();
list.clear();
Splitters.by('\t').split(line, list);
int size = list.size();
int index = 0;
switch (step) {
case 0:
if (size >= 3) {
String id = list.get(index++);
String parentId = list.get(index++);
String rootId = list.get(index++);
childId = id;
tree = m_messageManager.getThreadLocalMessageTree().copy();
tree.setMessageId(parentId);
tree.setParentMessageId(rootId);
tree.setRootMessageId(rootId);
tree.setDomain(getDomainByMessageId(id, tree.getDomain()));
step++;
}
break;
case 1:
if (size >= 13) {
String name = list.get(index++);
String status = list.get(index++);
String url = list.get(index++);
String requestHeaderLen = list.get(index++);
String upstreamUrl = list.get(index++);
String responseHeaderLen = list.get(index++);
String responseBodyLen = list.get(index++);
String responseBodyBlocks = list.get(index++);
long t0 = toLong(list.get(index++));
long t1 = toLong(list.get(index++));
long t2 = toLong(list.get(index++));
long t3 = toLong(list.get(index++));
long t4 = toLong(list.get(index++));
DefaultTransaction t = new DefaultTransaction(name, url, m_messageManager);
t.addChild(newEvent(name + ".Status", status, null));
t.addChild(newEvent("RemoteCall", upstreamUrl, childId));
t.addData("_m", (t1 - t0) + "," + (t2 - t1) + "," + (t3 - t2) + "," + (t4 - t3));
t.addData("in", requestHeaderLen);
t.addData("out", responseHeaderLen + "," + responseBodyLen);
t.addData("blocks", responseBodyBlocks);
t.addData("url", upstreamUrl);
if ("200".equals(status)) {
t.setStatus(Message.SUCCESS);
} else {
t.setStatus(status);
}
t.setDurationInMillis(t4 - t0);
t.setCompleted(true);
tree.setMessage(t);
step++;
}
break;
default:
// shouldn't go here
System.err.println("Unexpected line: " + line + ".");
break;
}
}
return tree;
}
private String readLine(StringBuilder sb) throws InterruptedException {
int size = m_buffer.limit();
long readerIndex = m_descriptor.getReaderIndex();
try {
while (true) {
long writerIndex = m_descriptor.getWriterIndex();
if (readerIndex == writerIndex) {
// buffer is empty
TimeUnit.MILLISECONDS.sleep(10);
continue;
}
if (readerIndex > writerIndex) {
while (readerIndex < size) {
byte b = m_buffer.get();
readerIndex++;
if (b == '\n') {
return sb.toString();
} else {
sb.append((char) (b & 0xFF));
}
}
readerIndex = 0;
m_buffer.rewind();
}
while (readerIndex < writerIndex) {
byte b = m_buffer.get();
readerIndex++;
if (b == '\n') {
return sb.toString();
} else {
sb.append((char) (b & 0xFF));
}
}
}
} finally {
m_descriptor.setReaderIndex(readerIndex);
}
}
private long toLong(String str) {
long value = 0;
int len = str == null ? 0 : str.length();
for (int i = 0; i < len; i++) {
char ch = str.charAt(i);
value = value * 10 + (ch - '0');
}
return value;
}
@Override
public String toString() {
return String.format("%s[file=%s]", getClass().getSimpleName(), m_file);
}
}
}
......@@ -8,6 +8,7 @@ import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.CatClientModule;
import com.dianping.cat.agent.MmapConsumerTask;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.internal.DefaultMessageManager;
......@@ -26,6 +27,10 @@ import com.dianping.cat.message.spi.internal.DefaultMessageStatistics;
import com.dianping.cat.status.StatusUpdateTask;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
......@@ -53,6 +58,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(StatusUpdateTask.class) //
.req(MessageStatistics.class, ClientConfigManager.class));
all.add(C(MmapConsumerTask.class) //
.req(ClientConfigManager.class, MessageManager.class));
all.add(C(Module.class, CatClientModule.ID, CatClientModule.class));
all.addAll(new CodecComponentConfigurator().defineComponents());
......@@ -60,8 +68,4 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
return all;
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
}
......@@ -13,6 +13,7 @@ import org.unidal.helper.Files;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.client.entity.ClientConfig;
import com.dianping.cat.configuration.client.entity.Domain;
import com.dianping.cat.configuration.client.entity.Property;
import com.dianping.cat.configuration.client.entity.Server;
import com.dianping.cat.configuration.client.transform.DefaultSaxParser;
......@@ -56,6 +57,38 @@ public class ClientConfigManager implements LogEnabled {
}
}
/**
* Return the max total message node size for the whole message, children after this limit will be split into another child
* message tree.
*
* @return
*/
public int getMaxMessageLength() {
if (m_config == null) {
return 1000;
} else {
return getDomain().getMaxMessageSize();
}
}
public String getMmapName() {
return getPropertyValue("mmap-name", "/data/appdatas/cat/mmap");
}
private String getPropertyValue(String name, String defaultValue) {
String value = defaultValue;
if (m_config != null) {
Property property = m_config.getProperties().get(name);
if (property != null) {
value = property.getText();
}
}
return value;
}
public List<Server> getServers() {
if (m_config == null) {
return Collections.emptyList();
......@@ -133,18 +166,4 @@ public class ClientConfigManager implements LogEnabled {
public boolean isInitialized() {
return m_config != null;
}
/**
* Return the max total message node size for the whole message, children after this limit will be split into another child
* message tree.
*
* @return
*/
public int getMaxMessageLength() {
if (m_config == null) {
return 1000;
} else {
return getDomain().getMaxMessageSize();
}
}
}
......@@ -99,7 +99,7 @@ public abstract class AbstractMessage implements Message {
return Message.SUCCESS.equals(m_status);
}
protected void setCompleted(boolean completed) {
public void setCompleted(boolean completed) {
m_completed = completed;
}
......
......@@ -76,7 +76,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
}
void flush(MessageTree tree) {
public void flush(MessageTree tree) {
MessageSender sender = m_transportManager.getSender();
if (sender != null && !shouldThrottle(tree)) {
......
......@@ -47,7 +47,7 @@ public class MessageIdFactory {
index = m_index++;
saveMark();
}
StringBuilder sb = new StringBuilder(m_domain.length() + 32);
sb.append(m_domain);
......@@ -92,6 +92,8 @@ public class MessageIdFactory {
String tmpDir = System.getProperty("java.io.tmpdir");
File mark = new File(tmpDir, "cat-" + domain + ".mark");
System.out.println("Message id mark file: " + mark.getCanonicalPath());
m_markFile = new RandomAccessFile(mark, "rw");
m_byteBuffer = m_markFile.getChannel().map(MapMode.READ_WRITE, 0, 20);
......
......@@ -47,20 +47,20 @@ public abstract class MockMessageBuilder {
return e;
}
protected MetricHolder m(String type, String name) {
MetricHolder e = new MetricHolder(type, name);
protected HeartbeatHolder h(String type, String name) {
HeartbeatHolder h = new HeartbeatHolder(type, name);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
e.setTimestampInMicros(parent.getCurrentTimestampInMicros());
h.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
return e;
return h;
}
protected MetricHolder m(String type, String name, String data) {
MetricHolder e = new MetricHolder(type, name, data);
protected MetricHolder m(String type, String name) {
MetricHolder e = new MetricHolder(type, name);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
......@@ -71,16 +71,16 @@ public abstract class MockMessageBuilder {
return e;
}
protected HeartbeatHolder h(String type, String name) {
HeartbeatHolder h = new HeartbeatHolder(type, name);
protected MetricHolder m(String type, String name, String data) {
MetricHolder e = new MetricHolder(type, name, data);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
h.setTimestampInMicros(parent.getCurrentTimestampInMicros());
e.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
return h;
return e;
}
protected TransactionHolder t(String type, String name, long durationInMillis) {
......@@ -94,9 +94,9 @@ public abstract class MockMessageBuilder {
m_stack.push(t);
return t;
}
protected TransactionHolder t(String type, String name,String data, long durationInMillis) {
TransactionHolder t = new TransactionHolder(type, name,data, durationInMillis);
protected TransactionHolder t(String type, String name, String data, long durationInMillis) {
TransactionHolder t = new TransactionHolder(type, name, data, durationInMillis);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
......@@ -129,6 +129,18 @@ public abstract class MockMessageBuilder {
m_data = data;
}
public void addData(String key, String value) {
if (m_data == null) {
m_data = key + "=" + value;
} else {
m_data = m_data + "&" + key + "=" + value;
}
}
public String getData() {
return m_data;
}
public String getName() {
return m_name;
}
......@@ -154,10 +166,6 @@ public abstract class MockMessageBuilder {
m_status = status;
}
public String getData() {
return m_data;
}
@Override
public void setTimestampInMicros(long timestampInMicros) {
m_timestampInMicros = timestampInMicros;
......@@ -192,34 +200,6 @@ public abstract class MockMessageBuilder {
}
}
protected static class MetricHolder extends AbstractMessageHolder {
private DefaultMetric m_metric;
public MetricHolder(String type, String name) {
super(type, name);
}
public MetricHolder(String type, String name, String data) {
super(type, name, data);
}
@Override
public Metric build() {
m_metric = new DefaultMetric(getType(), getName());
m_metric.setTimestamp(getTimestampInMillis());
m_metric.setStatus(getStatus());
m_metric.addData(getData());
m_metric.complete();
return m_metric;
}
public MetricHolder status(String status) {
setStatus(status);
return this;
}
}
protected static class HeartbeatHolder extends AbstractMessageHolder {
private DefaultHeartbeat m_heartbeat;
......@@ -250,6 +230,34 @@ public abstract class MockMessageBuilder {
public void setTimestampInMicros(long timestampInMicros);
}
protected static class MetricHolder extends AbstractMessageHolder {
private DefaultMetric m_metric;
public MetricHolder(String type, String name) {
super(type, name);
}
public MetricHolder(String type, String name, String data) {
super(type, name, data);
}
@Override
public Metric build() {
m_metric = new DefaultMetric(getType(), getName());
m_metric.setTimestamp(getTimestampInMillis());
m_metric.setStatus(getStatus());
m_metric.addData(getData());
m_metric.complete();
return m_metric;
}
public MetricHolder status(String status) {
setStatus(status);
return this;
}
}
protected class TransactionHolder extends AbstractMessageHolder {
private long m_durationInMicros;
......@@ -266,14 +274,13 @@ public abstract class MockMessageBuilder {
m_durationInMicros = durationInMicros;
}
public TransactionHolder(String type, String name,String data, long durationInMicros) {
super(type, name,data);
public TransactionHolder(String type, String name, String data, long durationInMicros) {
super(type, name, data);
m_durationInMicros = durationInMicros;
}
public TransactionHolder after(long periodInMicros) {
m_currentTimestampInMicros += periodInMicros;
return this;
......@@ -311,6 +318,11 @@ public abstract class MockMessageBuilder {
return this;
}
public TransactionHolder data(String key, String value) {
addData(key, value);
return this;
}
public long getCurrentTimestampInMicros() {
return m_currentTimestampInMicros;
}
......
......@@ -94,6 +94,18 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.agent.MmapConsumerTask</role>
<implementation>com.dianping.cat.agent.MmapConsumerTask</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.configuration.ClientConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageManager</role>
</requirement>
</requirements>
</component>
<component>
<role>org.unidal.initialization.Module</role>
<role-hint>cat-client</role-hint>
......
package com.dianping.cat.agent;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import org.junit.Test;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.message.internal.MessageIdFactory;
public class MmapConsumerTaskTest extends ComponentTestCase {
private void createMessage(MessageIdFactory factory, StringBuilder sb, int i) {
String status = (i % 7 == 0) || (i % 11 == 0) ? "50" + (i % 3) : "200";
long t0 = System.currentTimeMillis();
// <id>\t<parent-id>\t<root-id>\n
sb.append(String.format("%s\t%s\t%s\n", factory.getNextId(), factory.getNextId(), factory.getNextId()));
// <name>\t<status>\t<url>\t<request-header-len>\t<upstream-url>\t<response-header-len>\t<response-body-len>\t<response-body-blocks>\t<t0>\t<t1>\t<t2>\t<3>\t<t4>\n
sb.append(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", //
"NginxTest", status, "http://url/here/" + i, i % 10, "http://upstream/url/here/" + (i % 3), //
i % 9, i % 8, i % 7, t0, t0 + i, t0 + 3 * i, t0 + 4 * i, t0 + 5 * i));
// \n
sb.append("\n");
}
@Test
public void generateDataFile() throws Exception {
File idx = new File("/data/appdatas/cat/mmap.idx");
File dat = new File("/data/appdatas/cat/mmap.dat");
MessageIdFactory factory = lookup(MessageIdFactory.class);
StringBuilder sb = new StringBuilder(8192);
factory.initialize("Cat");
for (int i = 0; i < 100; i++) {
createMessage(factory, sb, i);
}
FileWriter datWriter = new FileWriter(dat);
datWriter.write(sb.toString());
datWriter.close();
updateMmapIndex(idx, dat.length(), dat.length(), 0);
}
private void updateMmapIndex(File idx, long capacity, long writerIndex, long readerIndex) throws FileNotFoundException,
IOException {
RandomAccessFile raf = new RandomAccessFile(idx, "rw");
MappedByteBuffer buffer = raf.getChannel().map(MapMode.READ_WRITE, 0, 24);
buffer.order(ByteOrder.LITTLE_ENDIAN);
if (capacity > 0) {
buffer.putLong(0, capacity);
}
if (writerIndex >= 0) {
buffer.putLong(8, writerIndex);
}
if (readerIndex >= 0) {
buffer.putLong(16, readerIndex);
}
buffer.force();
raf.close();
}
@Test
public void updateWriterIndex() throws Exception {
File idx = new File("/data/appdatas/cat/mmap.idx");
MessageIdFactory factory = lookup(MessageIdFactory.class);
StringBuilder sb = new StringBuilder(8192);
factory.initialize("Cat");
for (int i = 0; i < 50; i++) {
createMessage(factory, sb, i);
}
updateMmapIndex(idx, -1, sb.length(), -1);
}
}
......@@ -3,7 +3,7 @@
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-consumer-advanced</artifactId>
......
......@@ -3,7 +3,7 @@
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-consumer</artifactId>
......
......@@ -107,7 +107,7 @@ public class DomainManager implements Initializable, LogEnabled {
@Override
public void initialize() throws InitializationException {
if (!m_manager.isLocalMode()) {
// if (!m_manager.isLocalMode()) {
try {
m_ipDomains.put(UNKNOWN_IP, UNKNOWN_PROJECT);
List<Hostinfo> infos = m_hostInfoDao.findAllIp(HostinfoEntity.READSET_FULL);
......@@ -126,7 +126,7 @@ public class DomainManager implements Initializable, LogEnabled {
}
Threads.forGroup("Cat").start(new ReloadDomainTask());
}
// }
}
public class ReloadDomainTask implements Task {
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-core</artifactId>
......
......@@ -17,6 +17,7 @@ import com.dianping.cat.message.internal.MockMessageBuilder;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.BufferWriter;
import com.site.helper.Splitters;
/**
* Local use only, do not use it over network since it only supports one-way encoding
......@@ -24,21 +25,27 @@ import com.dianping.cat.message.spi.codec.BufferWriter;
public class WaterfallMessageCodec implements MessageCodec, Initializable {
public static final String ID = "waterfall";
private static final String VERSION = "WF1"; // Waterfall version 1
private static final String VERSION = "WF2"; // Waterfall version 2
@Inject
private BufferWriter m_writer;
private BufferHelper m_bufferHelper;
private String[] m_colors = { "#0066ff", "#006699", "#006633", "#0033ff", "#003399", "#003333" };
private boolean m_mockMode = false;
protected int countTransactions(Transaction t) {
protected int calculateLines(Transaction t) {
int count = 1;
for (Message child : t.getChildren()) {
if (child instanceof Transaction) {
count += countTransactions((Transaction) child);
count += calculateLines((Transaction) child);
} else if (child instanceof Event) {
if (child.getType().equals("RemoteCall")) {
count++;
}
}
}
......@@ -73,7 +80,7 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
Ruler ruler = new Ruler((int) t.getDurationInMicros());
ruler.setWidth(1400);
ruler.setHeight(18 * countTransactions(t) + 10);
ruler.setHeight(18 * calculateLines(t) + 10);
ruler.setOffsetX(200);
ruler.setOffsetY(10);
......@@ -154,7 +161,6 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
String logviewId = String.valueOf(event.getData());
b.branch(locator, x, y, width, height);
x += locator.getLevel() * width;
b.tagWithText("text", "<a href='#'>[:: show ::]</a>", "x", x + 2, "y", y - 5, "font-size", "16", "stroke-width", "0", "fill",
"blue", "onclick", "popup('" + logviewId + "');");
......@@ -213,8 +219,9 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
for (int i = 0; i < len; i++) {
Message child = children.get(i);
locator.setLast(i == len - 1);
if (child instanceof Transaction) {
locator.setLast(i == len - 1);
count += encodeTransaction(tree, (Transaction) child, buf, locator, ruler);
} else if (child instanceof Event && "RemoteCall".equals(child.getType())) {
count += encodeRemoteCall(tree, (Event) child, buf, locator, ruler);
......@@ -228,16 +235,18 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
protected int encodeTransactionLine(MessageTree tree, Transaction t, ChannelBuffer buf, Locator locator, Ruler ruler) {
BufferHelper helper = m_bufferHelper;
XmlBuilder b = new XmlBuilder();
StringBuilder sb = b.getResult();
int width = 6;
int height = 18;
int x = 0;
int y = locator.getLine() * height + ruler.getOffsetY();
String rid = "r" + locator.getLine();
String tid = "t" + locator.getLine();
long t0 = tree.getMessage().getTimestamp();
long t1 = t.getTimestamp();
int rx = ruler.calcX((t1 - t0) * 1000);
int rw = ruler.calcWidth(t.getDurationInMicros() * 1000);
int[] segments = getTransactionDurationSegments(t);
b.branch(locator, x, y, width, height);
x += locator.getLevel() * width;
if (t.getStatus().equals("0")) {
......@@ -247,25 +256,95 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
}
b.add(t.getType()).newLine();
b.tag("set", "attributeName", "fill", "to", "red", "begin", rid + ".mouseover", "end", rid + ".mouseout");
b.tag("set", "attributeName", "fill", "to", "red", "begin", tid + ".mouseover", "end", tid + ".mouseout");
b.tag2("text");
long t0 = tree.getMessage().getTimestamp();
long t1 = t.getTimestamp();
long d = t.getDurationInMicros();
if (segments == null) {
String durationInMillis = String.format("%.2f %s", t.getDurationInMicros() / 1000.0, t.getName());
int rx = ruler.calcX((t1 - t0) * 1000);
int rw = ruler.calcX(d) - ruler.getOffsetX();
b.tag("rect", "x", rx + 1, "y", y - 15, "width", rw, "height", height - 2, "fill", "#0066ff", "opacity", "0.5");
b.tagWithText("text", durationInMillis, "x", rx + 5, "y", y - 3, "font-size", "11", "stroke-width", "0");
} else {
int index = 0;
b.tag("rect", "id", rid, "x", rx + 1, "y", y - 15, "width", rw, "height", height - 2, "fill", "#0066ff", "opacity", "0.5");
b.tagWithText("text", String.format("%.2f %s", t.getDurationInMicros() / 1000.0, t.getName()), "id", tid, "x", rx + 5, "y",
y - 3, "font-size", "11", "stroke-width", "0");
for (int segment : segments) {
int w = ruler.calcWidth(segment);
String durationInMillis = String.format("%.2f %s", segment / 1000.0 / 1000.0, index == 0 ? t.getName() : "");
String color = m_colors[index % m_colors.length];
int count = helper.write(buf, sb.toString());
b.tag("rect", "x", rx + 1, "y", y - 15, "width", w, "height", height - 2, "fill", color, "opacity", "0.5");
b.tagWithText("text", durationInMillis, "x", rx + 5, "y", y - 3, "font-size", "11", "stroke-width", "0");
index++;
rx += w;
}
}
b.tag("rect", "id", tid, "x", ruler.getOffsetX() + 1, "y", y - 15, "width", ruler.getWidth(), "height", height, "fill",
"#ffffff", "stroke-width", "0", "opacity", "0.01");
int count = helper.write(buf, b.getResult().toString());
return count;
}
private int[] getTransactionDurationSegments(Transaction t) {
String data = t.getData().toString();
if (data.startsWith("_m=")) {
int pos = data.indexOf('&');
String str;
if (pos < 0) {
str = data.substring(3);
} else {
str = data.substring(3, pos);
}
List<String> parts = Splitters.by(',').split(str);
int len = parts.size();
int[] segments = new int[len];
for (int i = 0; i < len; i++) {
String part = parts.get(i);
try {
segments[i] = Integer.parseInt(part) * 1000;
} catch (Exception e) {
// ignore it
}
}
return segments;
} else if (data.startsWith("_u=")) {
int pos = data.indexOf('&');
String str;
if (pos < 0) {
str = data.substring(3);
} else {
str = data.substring(3, pos);
}
List<String> parts = Splitters.by(',').split(str);
int len = parts.size();
int[] segments = new int[len];
for (int i = 0; i < len; i++) {
String part = parts.get(i);
try {
segments[i] = Integer.parseInt(part);
} catch (Exception e) {
// ignore it
}
}
return segments;
} else {
return null;
}
}
protected List<Message> getVisibleChildren(Transaction parent) {
List<Message> children = new ArrayList<Message>();
......@@ -280,6 +359,10 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
return children;
}
public boolean isMockMode() {
return m_mockMode;
}
@Override
public void initialize() throws InitializationException {
m_bufferHelper = new BufferHelper(m_writer);
......@@ -300,9 +383,9 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
.mark().after(200).child(t("MEMCACHED", "Inc", 319)) //
.reset().after(500).child(t("BIG ASS SERVICE", "getThemDatar", 97155) //
.after(1000).mark().child(t("SERVICE", "getStuff", 63760)) //
.child(e("RemoteCall", "mock", "mock-message-id")) //
.reset().child(t("DATAR", "findThings", 94537)) //
.reset().child(t("DATAR", "findThings", 94537).data("_m", "10000,30000,15000,39537")) //
.after(200).child(t("THINGIE", "getMoar", 1435)) //
.child(e("RemoteCall", "mock", "mock-message-id")) //
) //
.after(100).mark().child(t("OTHER DATA SERVICE", "get", 4394) //
.after(800).mark().child(t("MEMCACHED", "Get", 378)) //
......@@ -311,7 +394,7 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
.reset().child(t("FINAL DATA SERVICE", "get", 1902) //
.after(1000).mark().child(t("MEMCACHED", "Get", 386)) //
.reset().child(t("MEMCACHED", "Get", 322)) //
.reset().child(t("MEMCACHED", "Get", 322)) //
.reset().child(t("MEMCACHED", "Get", 542)) //
) //
) //
;
......@@ -674,16 +757,26 @@ public class WaterfallMessageCodec implements MessageCodec, Initializable {
}
}
public int calcX(long value) {
int w = (int) (value * getUnit() / m_unitStep);
public int calcX(long timeInMillis) {
int w = (int) (timeInMillis * getUnit() / m_unitStep);
if (w == 0 && value > 0) {
if (w == 0 && timeInMillis > 0) {
w = 1;
}
return w + m_offsetX;
}
public int calcWidth(long timeInMicros) {
int w = (int) (timeInMicros * getUnit() / m_unitStep / 1000);
if (w == 0 && timeInMicros > 0) {
w = 1;
}
return w;
}
public int getHeight() {
return m_height;
}
......
......@@ -5,6 +5,7 @@ import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import com.dianping.cat.message.spi.core.HtmlMessageCodecTest;
import com.dianping.cat.message.spi.core.WaterfallMessageCodecTest;
import com.dianping.cat.storage.dump.LocalMessageBucketManagerTest;
import com.dianping.cat.storage.dump.LocalMessageBucketTest;
import com.dianping.cat.storage.report.LocalReportBucketTest;
......@@ -15,14 +16,17 @@ import com.dianping.cat.task.TaskManagerTest;
HtmlMessageCodecTest.class,
/* .storage */
LocalReportBucketTest.class,
WaterfallMessageCodecTest.class,
/* .storage.dump */
LocalMessageBucketTest.class,
LocalMessageBucketManagerTest.class,
/* .storage.report */
LocalReportBucketTest.class,
/* .task */
TaskManagerTest.class
})
......
......@@ -3,10 +3,19 @@ package com.dianping.cat.message.spi.core;
import junit.framework.Assert;
import org.junit.Test;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.core.WaterfallMessageCodec.Ruler;
public class WaterfallMessageCodecTest {
public class WaterfallMessageCodecTest extends ComponentTestCase {
@Test
public void testNotMockMode() throws Exception {
WaterfallMessageCodec codec = (WaterfallMessageCodec) lookup(MessageCodec.class, WaterfallMessageCodec.ID);
Assert.assertEquals("WaterfallMessageCodec is in mock mode.", false, codec.isMockMode());
}
@Test
public void testRuler() {
checkRuler(0, 0, 1);
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-hadoop</artifactId>
......
......@@ -3,7 +3,7 @@
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-home</artifactId>
......
......@@ -17,7 +17,7 @@ public class ServiceBuilderTest extends ComponentTestCase{
DomainManager manager = lookup(DomainManager.class);
manager.initialize();
builder.buildHourlyTask("service", "cat", new SimpleDateFormat("yyyyMMddHH").parse("2013071210"));
builder.buildHourlyTask("service", "cat", new SimpleDateFormat("yyyyMMddHH").parse("2013082011"));
}
@Test
......
package com.dianping.cat.report.task.service;
import org.junit.Assert;
import org.junit.Test;
import org.unidal.webres.helper.Files;
import com.dianping.cat.home.service.entity.ServiceReport;
import com.dianping.cat.home.service.transform.DefaultSaxParser;
public class ServiceReportTest {
@Test
public void testServiceReportMerge() throws Exception {
String oldXml = Files.forIO().readFrom(getClass().getResourceAsStream("ServiceReportOld.xml"), "utf-8");
String newXml = Files.forIO().readFrom(getClass().getResourceAsStream("ServiceReportNew.xml"), "utf-8");
ServiceReport reportOld = DefaultSaxParser.parse(oldXml);
ServiceReport reportNew = DefaultSaxParser.parse(newXml);
String expected = Files.forIO().readFrom(getClass().getResourceAsStream("ServiceReportResult.xml"),
"utf-8");
ServiceReportMerger merger = new ServiceReportMerger(new ServiceReport(reportOld.getDomain()));
reportOld.accept(merger);
reportNew.accept(merger);
Assert.assertEquals("Check the merge result!", expected.replace("\r", ""), merger.getServiceReport()
.toString().replace("\r", ""));
Assert.assertEquals("Source report is changed!", newXml.replace("\r", ""), reportNew.toString().replace("\r", ""));
}
}
<?xml version="1.0" encoding="utf-8"?>
<service-report domain="Cat">
<domain id="ShoppicWeb">
<totalCount>100</totalCount>
<failureCount>5</failureCount>
<failurePercent>0.05</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb1">
<totalCount>100</totalCount>
<failureCount>5</failureCount>
<failurePercent>0.05</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb2">
<totalCount>100</totalCount>
<failureCount>5</failureCount>
<failurePercent>0.05</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
</service-report>
<?xml version="1.0" encoding="utf-8"?>
<service-report domain="Cat">
<domain id="ShoppicWeb2">
<totalCount>100</totalCount>
<failureCount>2</failureCount>
<failurePercent>0.02</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb3">
<totalCount>100</totalCount>
<failureCount>2</failureCount>
<failurePercent>0.02</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb4">
<totalCount>100</totalCount>
<failureCount>2</failureCount>
<failurePercent>0.02</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
</service-report>
<?xml version="1.0" encoding="utf-8"?>
<service-report domain="Cat">
<domain id="ShoppicWeb2">
<totalCount>200</totalCount>
<failureCount>7</failureCount>
<failurePercent>0.035</failurePercent>
<sum>400.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb3">
<totalCount>100</totalCount>
<failureCount>2</failureCount>
<failurePercent>0.02</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb4">
<totalCount>100</totalCount>
<failureCount>2</failureCount>
<failurePercent>0.02</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb">
<totalCount>100</totalCount>
<failureCount>5</failureCount>
<failurePercent>0.05</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
<domain id="ShoppicWeb1">
<totalCount>100</totalCount>
<failureCount>5</failureCount>
<failurePercent>0.05</failurePercent>
<sum>200.0</sum>
<avg>2.0</avg>
<qps>0.0</qps>
</domain>
</service-report>
......@@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.6.2</version>
<version>0.6.2-SNAPSHOT</version>
<name>CAT</name>
<description>Central Application Tracking</description>
<packaging>pom</packaging>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册