提交 bd2287bf 编写于 作者: F Frankie Wu

add missing files

上级 a8841783
package com.dianping.cat.consumer.dump;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class DumpAnalyzer extends AbstractMessageAnalyzer<Object> implements LogEnabled {
private MessagePathBuilder m_builder;
private DumpChannelManager m_manager;
private long m_extraTime;
private long m_startTime;
private long m_duration;
private Logger m_logger;
public void doCheckpoint(boolean atEnd) {
if (atEnd) {
// TODO upload to remote HDFS
public void enableLogging(Logger logger) {
m_logger = logger;
public Set<String> getDomains() {
return Collections.emptySet();
public Object getReport(String domain) {
throw new UnsupportedOperationException("This should not be called!");
protected boolean isTimeout() {
long currentTime = System.currentTimeMillis();
long endTime = m_startTime + m_duration + m_extraTime;
return currentTime > endTime;
protected void process(MessageTree tree) {
if (tree.getMessage() == null) {
try {
String ipAddress = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
long timestamp = tree.getMessage().getTimestamp();
String domain = tree.getDomain();
String path = m_builder.getMessagePath(domain + "-" + ipAddress, new Date(timestamp));
DumpChannel channel = m_manager.openChannel(path, false);
int length = channel.write(tree);
if (length <= 0) {
channel = m_manager.openChannel(path, true);
} catch (Exception e) {
m_logger.error("Error when dumping to local file system!", e);
public void setAnalyzerInfo(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
m_duration = duration;
package com.dianping.cat.consumer.dump;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.site.helper.Files;
public class DumpChannel {
private MessageCodec m_codec;
private GZIPOutputStream m_out;
private File m_file;
private String m_path;
private long m_maxSize;
private long m_lastChunkAdjust = 100 * 1024L; // 100K
public DumpChannel(MessageCodec codec, File baseDir, String path, long maxSize, long lastChunkAdjust)
throws IOException {
m_codec = codec;
m_file = new File(baseDir, path);
m_path = path;
m_maxSize = maxSize;
m_lastChunkAdjust = lastChunkAdjust;
m_out = new GZIPOutputStream(new FileOutputStream(m_file));
public void close() {
try {
} catch (IOException e) {
// ignore it
public void moveTo(File anotherBase) throws IOException {
File target = new File(anotherBase, m_path);
boolean success = m_file.renameTo(target);
if (!success) {
Files.forIO().copy(new FileInputStream(m_file), new FileOutputStream(target));
public void setLastChunkAdjust(long lastChunkAdjust) {
m_lastChunkAdjust = lastChunkAdjust;
public int write(MessageTree tree) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
int length = buf.readInt();
long count = m_file.length();
if (m_maxSize > 0 && count + m_lastChunkAdjust + length > m_maxSize) {
// exceed the max size
return 0;
buf.getBytes(buf.readerIndex(), m_out, length);
// a blank line used to separate two message trees
return length + 1;
public File getFile() {
return m_file;
package com.dianping.cat.consumer.dump;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.configuration.server.entity.HdfsConfig;
import com.dianping.cat.configuration.server.entity.ServerConfig;
import com.dianping.cat.configuration.server.entity.StorageConfig;
import com.dianping.cat.message.spi.MessageCodec;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DumpChannelManager extends ContainerHolder implements Initializable, LogEnabled {
private static final long DEFAULT_MAX_SIZE = 128 * 1024 * 1024L;
private MessageCodec m_codec;
private Map<String, DumpChannel> m_channels = new HashMap<String, DumpChannel>();
private Map<String, Integer> m_indexes = new HashMap<String, Integer>();
private long m_maxSize = DEFAULT_MAX_SIZE / 1024 * 16;
private long m_lastChunkAdjust = 100 * 1024L; // 100K
private String m_baseDir = "target/dump";
private Logger m_logger;
public void closeAllChannels() {
for (DumpChannel channel : m_channels.values()) {
public void closeChannel(DumpChannel channel) {
File outbox = new File(m_baseDir, "outbox");
try {
} catch (IOException e) {
m_logger.error(String.format("Error when moving file(%s) to directory(%s)!", channel.getFile(), outbox), e);
public void initialize() throws InitializationException {
ServerConfigManager configManager = lookup(ServerConfigManager.class);
ServerConfig config = configManager.getServerConfig();
if (config != null) {
StorageConfig storage = config.getStorage();
HdfsConfig hdfsConfig = storage.findHdfs("dump");
m_baseDir = storage.getLocalBaseDir();
m_maxSize = toLong(hdfsConfig == null ? null : hdfsConfig.getMaxSize(), DEFAULT_MAX_SIZE);
private DumpChannel makeChannel(String key, String path, boolean forceNew) throws IOException {
String file;
if (forceNew) {
Integer index = m_indexes.get(key);
if (index == null) {
index = 1;
file = path + "-" + index + ".gz";
m_indexes.put(key, index);
} else {
file = path + ".gz";
DumpChannel channel = new DumpChannel(m_codec, new File(m_baseDir, "draft"), file, m_maxSize, m_lastChunkAdjust);
m_channels.put(key, channel);
return channel;
public DumpChannel openChannel(String path, boolean forceNew) throws IOException {
DumpChannel channel = m_channels.get(path);
if (channel == null) {
synchronized (m_channels) {
channel = m_channels.get(path);
if (channel == null) {
channel = makeChannel(path, path, false);
} else if (forceNew) {
synchronized (m_channels) {
channel = makeChannel(path, path, true);
return channel;
private long toLong(String str, long defaultValue) {
long value = 0;
int len = str == null ? 0 : str.length();
for (int i = 0; i < len; i++) {
char ch = str.charAt(i);
if (Character.isDigit(ch)) {
value = value * 10L + (ch - '0');
} else if (ch == 'm' || ch == 'M') {
value *= 1024 * 1024L;
} else if (ch == 'k' || ch == 'K') {
value *= 1024L;
if (value > 0) {
return value;
} else {
return defaultValue;
public void enableLogging(Logger logger) {
m_logger = logger;
package com.dianping.cat.consumer.dump;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.consumer.AnalyzerFactory;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.ComponentTestCase;
public class DumpAnalyzerTest extends ComponentTestCase {
public void test() throws Exception {
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
long now = 1334041324150L;
DefaultMessageQueue queue = new DefaultMessageQueue();
int num = 1000000;
for (int i = 0; i < num; i++) {
queue.offer(newMessageTree(i, now + i * 10L));
MessageAnalyzer analyzer = factory.create("dump", now, 10 * 1000L, 10 * 1000L);
private DefaultMessageTree newMessageTree(int i, long timestamp) {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setHostName("hostName" + i);
tree.setIpAddress("ipAddress" + i);
tree.setParentMessageId("parentMessageId" + i);
tree.setRootMessageId("rootMessageId" + i);
tree.setThreadId("threadId" + i);
tree.setMessage(newTransaction("type", "name" + i, timestamp, "0", 123456 + i, "data" + i));
return tree;
private Transaction newTransaction(String type, String name, long timestamp, String status, int duration, String data) {
DefaultTransaction transaction = new DefaultTransaction(type, name, null);
return transaction;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册