提交 b4cff2e8 编写于 作者: S sean.wang

add dump uploader

上级 cfe50349
......@@ -14,6 +14,7 @@ import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.dump.DumpAnalyzer;
import com.dianping.cat.consumer.dump.DumpChannel;
import com.dianping.cat.consumer.dump.DumpChannelManager;
import com.dianping.cat.consumer.dump.DumpUploader;
import com.dianping.cat.consumer.event.EventAnalyzer;
import com.dianping.cat.consumer.ip.IpAnalyzer;
import com.dianping.cat.consumer.problem.ProblemAnalyzer;
......@@ -23,6 +24,7 @@ import com.dianping.cat.consumer.problem.handler.Handler;
import com.dianping.cat.consumer.problem.handler.LongUrlHandler;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.hdfs.FileSystemManager;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
......@@ -38,41 +40,45 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactory.class));
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class) //
.config(E("extraTime").value(property("extraTime", "300000"))//
, E("analyzers").value("problem,transaction,event,ip,dump")));
.req(AnalyzerFactory.class) //
.config(E("extraTime").value(property("extraTime", "300000"))//
, E("analyzers").value("problem,transaction,event,ip,dump")));
String errorTypes = "Error,RuntimeException,Exception";
String failureTypes = "URL,SQL,Call,Cache";
all.add(C(Handler.class, ERROR.getName(), ErrorHandler.class)//
.config(E("errorType").value(errorTypes)));
.config(E("errorType").value(errorTypes)));
all.add(C(Handler.class, FAILURE.getName(), FailureHandler.class)//
.config(E("failureType").value(failureTypes)));
.config(E("failureType").value(failureTypes)));
all.add(C(Handler.class, LONG_URL.getName(), LongUrlHandler.class) //
.req(ServerConfigManager.class));
.req(ServerConfigManager.class));
all.add(C(ProblemAnalyzer.class).is(PER_LOOKUP) //
.req(Handler.class, new String[] { FAILURE.getName(), ERROR.getName(), LONG_URL.getName() }, "m_handlers") //
.req(BucketManager.class, ReportDao.class));
.req(Handler.class, new String[] { FAILURE.getName(), ERROR.getName(), LONG_URL.getName() }, "m_handlers") //
.req(BucketManager.class, ReportDao.class));
all.add(C(TransactionAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, ReportDao.class));
.req(BucketManager.class, ReportDao.class));
all.add(C(EventAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, ReportDao.class));
.req(BucketManager.class, ReportDao.class));
all.add(C(IpAnalyzer.class));
all.add(C(DumpAnalyzer.class).is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(DumpChannelManager.class));
.req(MessagePathBuilder.class) //
.req(DumpChannelManager.class));
all.add(C(DumpChannel.class));
all.add(C(DumpChannelManager.class) //
.req(MessageCodec.class, "plain-text"));
.req(MessageCodec.class, "plain-text"));
all.add(C(DumpUploader.class)//
.req(FileSystemManager.class)//
);
return all;
}
......
package com.dianping.cat.consumer.dump;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.hadoop.hdfs.FileSystemManager;
import com.site.helper.Scanners;
import com.site.helper.Scanners.IMatcher;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
/**
* Dump message to hdfs
*
* @author sean.wang
* @since Apr 10, 2012
*/
public class DumpUploader extends ContainerHolder implements Initializable, LogEnabled {
private static final int DEFAULT_CHECK_DURATION = 5 * 1000; // ms
private String m_baseDir;
@Inject
private FileSystemManager m_fileSystemManager;
private Logger m_logger;
private Thread m_thread;
private WriteJob m_job;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
m_baseDir = m_fileSystemManager.getServerConfig().getStorage().getLocalBaseDir();
m_job = new WriteJob();
Thread thread = new Thread(m_job);
thread.setName("MessageDumpToHdfs");
thread.start();
m_thread = thread;
}
public void dispose() {
m_job.shutdown();
try {
m_thread.join();
} catch (InterruptedException e) {
}
}
private FSDataOutputStream makeHdfsOutputStream(String path) throws IOException {
StringBuilder baseDir = new StringBuilder(32);
String id = "dump";
String key = id + ":" + path;
FileSystem fs = m_fileSystemManager.getFileSystem(key, id, path, baseDir);
Path file = new Path(baseDir.toString(), path);
FSDataOutputStream out = fs.create(file);
return out;
}
private void transfer(FileInputStream fis, FSDataOutputStream fdos) throws IOException {
byte[] buffer = new byte[10 * 1024];
int byteRead = -1;
while ((byteRead = fis.read(buffer)) != -1) {
fdos.write(buffer, 0, byteRead);
}
fdos.flush();
}
public void upload() {
File baseDir = new File(m_baseDir);
final List<String> paths = new ArrayList<String>();
Scanners.forDir().scan(baseDir, new IMatcher<File>() {
@Override
public boolean isDirEligible() {
return false;
}
@Override
public boolean isFileElegible() {
return true;
}
@Override
public Direction matches(File base, String path) {
paths.add(path);
return Direction.NEXT;
}
});
for (String path : paths) {
File file = new File(baseDir, path);
FSDataOutputStream fdos = null;
FileInputStream fis = null;
try {
fdos = makeHdfsOutputStream(path);
fis = new FileInputStream(file);
transfer(fis, fdos);
} catch (IOException e) {
m_logger.error("transfer file to hdfs fail", e);
continue;
} finally {
try {
if (fdos != null) {
fdos.close();
}
} catch (IOException e) {
}
try {
if (fis != null) {
fis.close();
}
} catch (IOException e) {
}
}
file.delete();
}
}
private class WriteJob implements Runnable {
private volatile boolean m_active = true;
private boolean isActive() {
return m_active;
}
@Override
public void run() {
long lastCheckedTime = System.currentTimeMillis();
try {
while (isActive()) {
upload();
if (System.currentTimeMillis() - lastCheckedTime >= DEFAULT_CHECK_DURATION) {
lastCheckedTime = System.currentTimeMillis();
}
}
} catch (Exception e) {
m_logger.warn("Error when dumping message to HDFS.", e);
}
}
public void shutdown() {
m_active = false;
}
}
}
......@@ -123,5 +123,14 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.dump.DumpUploader</role>
<implementation>com.dianping.cat.consumer.dump.DumpUploader</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
</requirement>
</requirements>
</component>
</components>
</plexus>
package com.dianping.cat.consumer.dump;
import org.junit.Test;
import com.dianping.cat.consumer.AnalyzerFactory;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.ComponentTestCase;
public class DumpUploaderTest extends ComponentTestCase {
@Test
public void testUpload() throws Exception {
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
long now = 1334041324150L;
DefaultMessageQueue queue = new DefaultMessageQueue();
int num = 10000;
queue.setSize(num);
queue.initialize();
for (int i = 0; i < num; i++) {
queue.offer(newMessageTree(i, now + i * 10L));
}
MessageAnalyzer analyzer = factory.create("dump", now, 10 * 1000L, 10 * 1000L);
analyzer.analyze(queue);
analyzer.doCheckpoint(true);
DumpUploader uploader = lookup(DumpUploader.class);
uploader.upload();
}
private DefaultMessageTree newMessageTree(int i, long timestamp) {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setDomain("domain");
tree.setHostName("hostName" + i);
tree.setIpAddress("ipAddress" + i);
tree.setMessageId(String.valueOf(i));
tree.setParentMessageId("parentMessageId" + i);
tree.setRootMessageId("rootMessageId" + i);
tree.setSessionToken("sessionToken");
tree.setThreadGroupName("threadGroupName");
tree.setThreadId("threadId" + i);
tree.setThreadName("threadName");
tree.setMessage(newTransaction("type", "name" + i, timestamp, "0", 123456 + i, "data" + i));
return tree;
}
private Transaction newTransaction(String type, String name, long timestamp, String status, int duration, String data) {
DefaultTransaction transaction = new DefaultTransaction(type, name, null);
transaction.setStatus(status);
transaction.addData(data);
transaction.complete();
transaction.setTimestamp(timestamp);
transaction.setDurationInMillis(duration);
return transaction;
}
}
......@@ -109,7 +109,7 @@
<implementation>com.dianping.cat.message.io.DefaultMessageQueue</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<size>1000</size>
<size>10000</size>
</configuration>
</component>
<component>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册