diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpAnalyzer.java new file mode 100644 index 0000000000000000000000000000000000000000..3ee1b70a0b9c7fd242528d73c939e0c0f3030805 --- /dev/null +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpAnalyzer.java @@ -0,0 +1,92 @@ +package com.dianping.cat.consumer.dump; + +import java.util.Collections; +import java.util.Date; +import java.util.Set; + +import org.codehaus.plexus.logging.LogEnabled; +import org.codehaus.plexus.logging.Logger; + +import com.dianping.cat.configuration.NetworkInterfaceManager; +import com.dianping.cat.message.spi.AbstractMessageAnalyzer; +import com.dianping.cat.message.spi.MessagePathBuilder; +import com.dianping.cat.message.spi.MessageTree; +import com.site.lookup.annotation.Inject; + +public class DumpAnalyzer extends AbstractMessageAnalyzer implements LogEnabled { + @Inject + private MessagePathBuilder m_builder; + + @Inject + private DumpChannelManager m_manager; + + private long m_extraTime; + + private long m_startTime; + + private long m_duration; + + private Logger m_logger; + + @Override + public void doCheckpoint(boolean atEnd) { + if (atEnd) { + m_manager.closeAllChannels(); + // TODO upload to remote HDFS + } + } + + @Override + public void enableLogging(Logger logger) { + m_logger = logger; + } + + @Override + public Set getDomains() { + return Collections.emptySet(); + } + + @Override + public Object getReport(String domain) { + throw new UnsupportedOperationException("This should not be called!"); + } + + @Override + protected boolean isTimeout() { + long currentTime = System.currentTimeMillis(); + long endTime = m_startTime + m_duration + m_extraTime; + + return currentTime > endTime; + } + + @Override + protected void process(MessageTree tree) { + if (tree.getMessage() == null) { + return; + } + + try { + String ipAddress = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); + long timestamp = tree.getMessage().getTimestamp(); + String domain = tree.getDomain(); + String path = m_builder.getMessagePath(domain + "-" + ipAddress, new Date(timestamp)); + DumpChannel channel = m_manager.openChannel(path, false); + int length = channel.write(tree); + + if (length <= 0) { + m_manager.closeChannel(channel); + + channel = m_manager.openChannel(path, true); + channel.write(tree); + } + } catch (Exception e) { + m_logger.error("Error when dumping to local file system!", e); + } + } + + public void setAnalyzerInfo(long startTime, long duration, long extraTime) { + m_extraTime = extraTime; + m_startTime = startTime; + m_duration = duration; + } +} diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpChannel.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpChannel.java new file mode 100644 index 0000000000000000000000000000000000000000..3ab6d2ac82d453846062b2111b4464ba1f8f4619 --- /dev/null +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpChannel.java @@ -0,0 +1,92 @@ +package com.dianping.cat.consumer.dump; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.zip.GZIPOutputStream; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import com.dianping.cat.message.spi.MessageCodec; +import com.dianping.cat.message.spi.MessageTree; +import com.site.helper.Files; + +public class DumpChannel { + private MessageCodec m_codec; + + private GZIPOutputStream m_out; + + private File m_file; + + private String m_path; + + private long m_maxSize; + + private long m_lastChunkAdjust = 100 * 1024L; // 100K + + public DumpChannel(MessageCodec codec, File baseDir, String path, long maxSize, long lastChunkAdjust) + throws IOException { + m_codec = codec; + m_file = new File(baseDir, path); + m_path = path; + m_maxSize = maxSize; + m_lastChunkAdjust = lastChunkAdjust; + + m_file.getParentFile().mkdirs(); + m_out = new GZIPOutputStream(new FileOutputStream(m_file)); + } + + public void close() { + try { + m_out.finish(); + m_out.close(); + } catch (IOException e) { + // ignore it + } + } + + public void moveTo(File anotherBase) throws IOException { + File target = new File(anotherBase, m_path); + + target.getParentFile().mkdirs(); + + boolean success = m_file.renameTo(target); + + if (!success) { + Files.forIO().copy(new FileInputStream(m_file), new FileOutputStream(target)); + m_file.delete(); + } + } + + public void setLastChunkAdjust(long lastChunkAdjust) { + m_lastChunkAdjust = lastChunkAdjust; + } + + public int write(MessageTree tree) throws IOException { + ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); + + m_codec.encode(tree, buf); + + int length = buf.readInt(); + long count = m_file.length(); + + if (m_maxSize > 0 && count + m_lastChunkAdjust + length > m_maxSize) { + // exceed the max size + return 0; + } + + buf.getBytes(buf.readerIndex(), m_out, length); + + // a blank line used to separate two message trees + m_out.write('\n'); + m_out.flush(); + + return length + 1; + } + + public File getFile() { + return m_file; + } +} diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpChannelManager.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpChannelManager.java new file mode 100644 index 0000000000000000000000000000000000000000..3d5fd2b232c4916b34f2636237fa34ac33ecc925 --- /dev/null +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/DumpChannelManager.java @@ -0,0 +1,147 @@ +package com.dianping.cat.consumer.dump; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.codehaus.plexus.logging.LogEnabled; +import org.codehaus.plexus.logging.Logger; +import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; +import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; + +import com.dianping.cat.configuration.ServerConfigManager; +import com.dianping.cat.configuration.server.entity.HdfsConfig; +import com.dianping.cat.configuration.server.entity.ServerConfig; +import com.dianping.cat.configuration.server.entity.StorageConfig; +import com.dianping.cat.message.spi.MessageCodec; +import com.site.lookup.ContainerHolder; +import com.site.lookup.annotation.Inject; + +public class DumpChannelManager extends ContainerHolder implements Initializable, LogEnabled { + private static final long DEFAULT_MAX_SIZE = 128 * 1024 * 1024L; + + @Inject + private MessageCodec m_codec; + + private Map m_channels = new HashMap(); + + private Map m_indexes = new HashMap(); + + private long m_maxSize = DEFAULT_MAX_SIZE / 1024 * 16; + + private long m_lastChunkAdjust = 100 * 1024L; // 100K + + private String m_baseDir = "target/dump"; + + private Logger m_logger; + + public void closeAllChannels() { + for (DumpChannel channel : m_channels.values()) { + closeChannel(channel); + } + + m_channels.clear(); + } + + public void closeChannel(DumpChannel channel) { + release(channel); + channel.close(); + + File outbox = new File(m_baseDir, "outbox"); + + outbox.mkdirs(); + + try { + channel.moveTo(outbox); + } catch (IOException e) { + m_logger.error(String.format("Error when moving file(%s) to directory(%s)!", channel.getFile(), outbox), e); + } + } + + @Override + public void initialize() throws InitializationException { + ServerConfigManager configManager = lookup(ServerConfigManager.class); + ServerConfig config = configManager.getServerConfig(); + + if (config != null) { + StorageConfig storage = config.getStorage(); + HdfsConfig hdfsConfig = storage.findHdfs("dump"); + + m_baseDir = storage.getLocalBaseDir(); + m_maxSize = toLong(hdfsConfig == null ? null : hdfsConfig.getMaxSize(), DEFAULT_MAX_SIZE); + } + } + + private DumpChannel makeChannel(String key, String path, boolean forceNew) throws IOException { + String file; + + if (forceNew) { + Integer index = m_indexes.get(key); + + if (index == null) { + index = 1; + } + + index++; + + file = path + "-" + index + ".gz"; + m_indexes.put(key, index); + } else { + file = path + ".gz"; + } + + DumpChannel channel = new DumpChannel(m_codec, new File(m_baseDir, "draft"), file, m_maxSize, m_lastChunkAdjust); + + m_channels.put(key, channel); + return channel; + } + + public DumpChannel openChannel(String path, boolean forceNew) throws IOException { + DumpChannel channel = m_channels.get(path); + + if (channel == null) { + synchronized (m_channels) { + channel = m_channels.get(path); + + if (channel == null) { + channel = makeChannel(path, path, false); + } + } + } else if (forceNew) { + synchronized (m_channels) { + channel = makeChannel(path, path, true); + } + } + + return channel; + } + + private long toLong(String str, long defaultValue) { + long value = 0; + int len = str == null ? 0 : str.length(); + + for (int i = 0; i < len; i++) { + char ch = str.charAt(i); + + if (Character.isDigit(ch)) { + value = value * 10L + (ch - '0'); + } else if (ch == 'm' || ch == 'M') { + value *= 1024 * 1024L; + } else if (ch == 'k' || ch == 'K') { + value *= 1024L; + } + } + + if (value > 0) { + return value; + } else { + return defaultValue; + } + } + + @Override + public void enableLogging(Logger logger) { + m_logger = logger; + } +} diff --git a/cat-consumer/src/test/java/com/dianping/cat/consumer/dump/DumpAnalyzerTest.java b/cat-consumer/src/test/java/com/dianping/cat/consumer/dump/DumpAnalyzerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8f0afa6e8b6bd7acc091046e23545e67fce752f8 --- /dev/null +++ b/cat-consumer/src/test/java/com/dianping/cat/consumer/dump/DumpAnalyzerTest.java @@ -0,0 +1,66 @@ +package com.dianping.cat.consumer.dump; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import com.dianping.cat.consumer.AnalyzerFactory; +import com.dianping.cat.message.Transaction; +import com.dianping.cat.message.internal.DefaultTransaction; +import com.dianping.cat.message.io.DefaultMessageQueue; +import com.dianping.cat.message.spi.MessageAnalyzer; +import com.dianping.cat.message.spi.internal.DefaultMessageTree; +import com.site.lookup.ComponentTestCase; + +@RunWith(JUnit4.class) +public class DumpAnalyzerTest extends ComponentTestCase { + @Test + public void test() throws Exception { + AnalyzerFactory factory = lookup(AnalyzerFactory.class); + long now = 1334041324150L; + DefaultMessageQueue queue = new DefaultMessageQueue(); + int num = 1000000; + + queue.setSize(num); + queue.initialize(); + + for (int i = 0; i < num; i++) { + queue.offer(newMessageTree(i, now + i * 10L)); + } + + MessageAnalyzer analyzer = factory.create("dump", now, 10 * 1000L, 10 * 1000L); + + analyzer.analyze(queue); + + analyzer.doCheckpoint(true); + } + + private DefaultMessageTree newMessageTree(int i, long timestamp) { + DefaultMessageTree tree = new DefaultMessageTree(); + + tree.setDomain("domain"); + tree.setHostName("hostName" + i); + tree.setIpAddress("ipAddress" + i); + tree.setMessageId(String.valueOf(i)); + tree.setParentMessageId("parentMessageId" + i); + tree.setRootMessageId("rootMessageId" + i); + tree.setSessionToken("sessionToken"); + tree.setThreadGroupName("threadGroupName"); + tree.setThreadId("threadId" + i); + tree.setThreadName("threadName"); + + tree.setMessage(newTransaction("type", "name" + i, timestamp, "0", 123456 + i, "data" + i)); + return tree; + } + + private Transaction newTransaction(String type, String name, long timestamp, String status, int duration, String data) { + DefaultTransaction transaction = new DefaultTransaction(type, name, null); + + transaction.setStatus(status); + transaction.addData(data); + transaction.complete(); + transaction.setTimestamp(timestamp); + transaction.setDurationInMillis(duration); + return transaction; + } +}