提交 51e55053 编写于 作者: F Frankie Wu

add cat-job project

上级 d442dd3d
package com.dianping.cat.storage;
import java.util.List;
import com.dianping.cat.message.spi.MessageTree;
/**
* Map to one HDFS directory for one report
*/
public interface MessageBucket extends Bucket<MessageTree> {
public void close();
public boolean storeById(String id, MessageTree value, String... tags);
public List<String> findAllIdsByTag(String tag);
public MessageTree findNextById(String id, Direction direction, String tag); // tag:
// "thread:101",
// "session:abc",
// "request:xyz",
// "parent:xxx"
public static enum Direction {
FORWARD,
BACKWARD;
}
}
......@@ -15,6 +15,10 @@
<groupId>com.dianping.cat</groupId>
<artifactId>cat-consumer</artifactId>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-job</artifactId>
</dependency>
<dependency>
<groupId>com.site.dal</groupId>
<artifactId>dal-xml</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-job</artifactId>
<name>CAT Job Analysis</name>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.203.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.7</version>
<executions>
<execution>
<id>generate plexus component descriptor</id>
<phase>process-classes</phase>
<goals>
<goal>plexus</goal>
</goals>
<configuration>
<className>com.dianping.cat.job.plexus.ComponentsConfigurator</className>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<index>true</index>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.dianping.cat.job.job.BrowserAnalyzer</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.dianping.cat.job;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class HdfsDumpConsumer implements MessageConsumer {
public static final String ID = "dump-to-hdfs";
@Inject
private MessageStorage m_storage;
@Inject
private String m_domain;
@Override
public void consume(MessageTree tree) {
if (m_domain == null || m_domain.equals(tree.getDomain())) {
m_storage.store(tree);
}
}
@Override
public String getConsumerId() {
return ID;
}
@Override
public String getDomain() {
return m_domain;
}
public void setDomain(String domain) {
m_domain = domain;
}
}
package com.dianping.cat.job.build;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.job.HdfsDumpConsumer;
import com.dianping.cat.job.hdfs.OutputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannel;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.job.hdfs.OutputChannel;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
if (isEnv("dev") || property("env", null) == null) {
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
.config(E("maxSize").value(String.valueOf(2 * 1024 * 1024L))));
all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class));
} else {
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
.config(E("maxSize").value(String.valueOf(128 * 1024 * 1024L))));
all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class) //
.config(E("baseDir").value("data"), //
E("serverUri").value("/catlog")));
}
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
.req(OutputChannelManager.class));
all.add(C(MessageConsumer.class, HdfsDumpConsumer.ID, HdfsDumpConsumer.class) //
.req(MessageStorage.class, "hdfs"));
return all;
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class DefaultOutputChannel implements OutputChannel {
@Inject
private MessageCodec m_codec;
@Inject
private int m_maxSize = 0; // 0 means unlimited
@Inject
private long m_ttl = 90 * 1000L; // 90 seconds
private OutputStream m_out;
private int m_count;
private long m_timestamp;
@Override
public void close() {
if (m_out != null) {
try {
m_out.close();
m_out = null;
} catch (IOException e) {
// ignore it
}
}
}
@Override
public void initialize(OutputStream out) {
m_out = out;
m_timestamp = System.currentTimeMillis();
}
@Override
public boolean isExpired() {
long now = System.currentTimeMillis();
return now - m_timestamp > m_ttl;
}
public void setMaxSize(int maxSize) {
m_maxSize = maxSize;
}
public void setTtl(long ttl) {
m_ttl = ttl;
}
@Override
public boolean write(MessageTree tree) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
int length = buf.readInt();
if (m_maxSize > 0 && m_count + length + 1 > m_maxSize) {
// exceed the max size
return false;
}
buf.getBytes(buf.readerIndex(), m_out, length);
// a blank line used to separate two message trees
m_out.write('\n');
m_count += length + 1;
return true;
}
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, LogEnabled {
@Inject
private MessagePathBuilder m_builder;
@Inject
private String m_baseDir = "target/hdfs";
@Inject
private URI m_serverUri;
private FileSystem m_fs;
private Path m_basePath;
private String m_ipAddress;
private Map<String, OutputChannel> m_channels = new HashMap<String, OutputChannel>();
private Map<String, Integer> m_indexes = new HashMap<String, Integer>();
private Logger m_logger;
@Override
public void cleanupChannels() {
try {
List<String> expired = new ArrayList<String>();
for (Map.Entry<String, OutputChannel> e : m_channels.entrySet()) {
if (e.getValue().isExpired()) {
expired.add(e.getKey());
}
}
for (String path : expired) {
OutputChannel channel = m_channels.remove(path);
closeChannel(channel);
}
} catch (Exception e) {
m_logger.warn("Error when doing cleanup!", e);
}
}
@Override
public void closeAllChannels() {
for (OutputChannel channel : m_channels.values()) {
closeChannel(channel);
}
}
@Override
public void closeChannel(OutputChannel channel) {
channel.close();
super.release(channel);
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
try {
Configuration config = new Configuration();
FileSystem fs;
config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) {
fs = FileSystem.getLocal(config);
} else {
fs = FileSystem.get(m_serverUri, config); // TODO Not tested yet
}
m_fs = fs;
m_basePath = new Path(m_fs.getWorkingDirectory(), m_baseDir);
} catch (Exception e) {
throw new InitializationException("Error when getting HDFS file system.", e);
}
try {
InetAddress localHost = InetAddress.getLocalHost();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
m_logger.warn("Unable to get local host!", e);
}
}
@Override
public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException {
String path = m_builder.getHdfsPath(tree, m_ipAddress);
OutputChannel channel = m_channels.get(path);
if (channel == null) {
Path file = new Path(m_basePath, path + "-0");
OutputStream out = m_fs.create(file);
channel = lookup(OutputChannel.class);
channel.initialize(out);
m_indexes.put(path, 0);
m_channels.put(path, channel);
} else if (forceNew) {
int index = m_indexes.get(path);
closeChannel(channel);
m_indexes.put(path, ++index);
Path file = new Path(m_basePath, path + "-" + index);
OutputStream out = m_fs.create(file);
channel = lookup(OutputChannel.class);
channel.initialize(out);
m_channels.put(path, channel);
}
return channel;
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
public void setServerUri(String serverUri) {
m_serverUri = URI.create(serverUri);
}
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class HdfsMessageStorage implements MessageStorage, Initializable, Disposable, LogEnabled {
@Inject
private OutputChannelManager m_manager;
private WriteJob m_job;
private Thread m_thread;
private Logger m_logger;
@Override
public void dispose() {
m_job.shutdown();
try {
m_thread.join();
} catch (InterruptedException e) {
// ignore it
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
m_job = new WriteJob();
Thread thread = new Thread(m_job);
thread.setName("Storage write Job");
thread.start();
m_thread = thread;
}
@Override
public String store(MessageTree tree) {
m_job.append(tree);
// Not available
return null;
}
class WriteJob implements Runnable {
private BlockingQueue<MessageTree> m_queue = new LinkedBlockingQueue<MessageTree>();
private volatile boolean m_active = true;
public void append(MessageTree tree) {
try {
m_queue.offer(tree);
} catch (Exception e) {
m_logger.warn("Error when adding job to queue.", e);
}
}
private void handle(MessageTree tree) {
try {
OutputChannel channel = m_manager.openChannel(tree, false);
boolean success = channel.write(tree);
if (!success) {
m_manager.closeChannel(channel);
channel = m_manager.openChannel(tree, true);
channel.write(tree);
}
} catch (IOException e) {
m_logger.error("Error when writing to HDFS!", e);
}
}
private boolean isActive() {
synchronized (this) {
return m_active;
}
}
@Override
public void run() {
long lastCheckedTime = System.currentTimeMillis();
try {
while (isActive()) {
MessageTree tree = m_queue.poll(1000 * 1000L, TimeUnit.NANOSECONDS);
if (tree != null) {
handle(tree);
}
// check connection timeout and close it
if (System.currentTimeMillis() - lastCheckedTime >= 5 * 1000) {
lastCheckedTime = System.currentTimeMillis();
m_manager.cleanupChannels();
}
}
// process the remaining job in the queue
while (!isActive()) {
MessageTree tree = m_queue.poll();
if (tree != null) {
handle(tree);
} else {
break;
}
}
} catch (Exception e) {
m_logger.warn("Error when dumping message to HDFS.", e);
}
m_manager.closeAllChannels();
}
public void shutdown() {
synchronized (this) {
m_active = false;
}
}
}
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import com.dianping.cat.message.spi.MessageTree;
public interface OutputChannel {
/**
* Close the channel.
*/
public void close();
/**
* Initialize the channel with an output stream.
*
* @param out
*/
public void initialize(OutputStream out);
/**
* Check if the channel is expired.
*
* @return true if the channel is expired, false otherwise.
*/
public boolean isExpired();
/**
* Output the message tree to the HDFS.
*
* @param tree
* @return false if the max size is reached, false otherwise.
* @throws IOException
*/
public boolean write(MessageTree tree) throws IOException;
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import com.dianping.cat.message.spi.MessageTree;
public interface OutputChannelManager {
public void cleanupChannels();
public void closeAllChannels();
public void closeChannel(OutputChannel channel);
public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException;
}
package com.dianping.cat.job.job;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.dianping.cat.job.mapreduce.MessageTreeInputFormat;
import com.dianping.cat.job.mapreduce.MessageTreeWritable;
import com.dianping.cat.message.Message;
import com.site.helper.Files;
public class BrowserAnalyzer extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new BrowserAnalyzer(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "browser analyzer");
job.setJarByClass(BrowserAnalyzer.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setInputFormatClass(MessageTreeInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("target/hdfs/20120215/17/null"));
FileOutputFormat.setOutputPath(job, new Path("target/browser"));
Files.forDir().delete(new File("target/browser"), true);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class TokenizerMapper extends Mapper<Object, MessageTreeWritable, Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text m_word = new Text();
public void map(Object key, MessageTreeWritable value, Context context) throws IOException, InterruptedException {
Message message = value.get().getMessage();
m_word.set(message.getType() + "." + message.getName());
context.write(m_word, ONE);
}
}
}
package com.dianping.cat.job.mapreduce;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class MessageTreeInputFormat extends FileInputFormat<LongWritable, MessageTreeWritable> {
@Override
public RecordReader<LongWritable, MessageTreeWritable> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new MessageTreeReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// the file is already small enough, so do not need to split it
return false;
}
}
package com.dianping.cat.job.mapreduce;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.codec.EscapingBufferWriter;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
public class MessageTreeReader extends RecordReader<LongWritable, MessageTreeWritable> {
private CompressionCodecFactory m_compressionCodecs;
private long m_start;
private long m_pos;
private long m_end;
private BlockReader m_in;
private LongWritable m_key;
private MessageTreeWritable m_value;
@Override
public void close() throws IOException {
if (m_in != null) {
m_in.close();
}
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return m_key;
}
@Override
public MessageTreeWritable getCurrentValue() throws IOException, InterruptedException {
return m_value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (m_start == m_end) {
return 0;
} else {
return Math.min(1.0f, (m_pos - m_start) / (float) (m_end - m_start));
}
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration config = context.getConfiguration();
m_start = split.getStart();
m_end = m_start + split.getLength();
m_compressionCodecs = new CompressionCodecFactory(config);
// open the file and seek to the start of the split
Path file = split.getPath();
CompressionCodec codec = m_compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(config);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
m_in = new BlockReader(codec.createInputStream(fileIn), config);
m_end = Long.MAX_VALUE;
} else {
if (m_start != 0) {
skipFirstLine = true;
--m_start;
fileIn.seek(m_start);
}
m_in = new BlockReader(fileIn, config);
}
if (skipFirstLine) { // skip first line and re-establish "start".
m_start += m_in.readBlock(new MessageTreeWritable());
}
m_pos = m_start;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (m_key == null) {
m_key = new LongWritable();
}
m_key.set(m_pos);
if (m_value == null) {
m_value = new MessageTreeWritable();
}
int blockSize = 0;
if (m_pos < m_end) {
blockSize = m_in.readBlock(m_value);
m_pos += blockSize;
}
if (blockSize == 0) {
m_key = null;
m_value = null;
return false;
} else {
return true;
}
}
static class BlockReader {
private BufferedInputStream m_in;
private PlainTextMessageCodec m_codec;
public BlockReader(InputStream in, Configuration config) {
int bufferSize = config.getInt("io.file.buffer.size", 8192);
m_in = new BufferedInputStream(in, bufferSize);
m_codec = new PlainTextMessageCodec();
m_codec.setBufferWriter(new EscapingBufferWriter());
}
public void close() throws IOException {
m_in.close();
}
public int readBlock(MessageTreeWritable tree) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
byte[] data = new byte[2048];
byte prev = 0;
int count = 0;
m_in.mark(Integer.MAX_VALUE);
int size = m_in.read(data);
int pos = 0;
while (size >= 0) {
if (pos >= size) {
buf.writeBytes(data, 0, size);
count += size;
m_in.mark(Integer.MAX_VALUE);
size = m_in.read(data);
pos = 0;
if (size < 0) {
break;
}
}
byte b = data[pos++];
if (b == '\n' && prev == '\n') {
buf.writeBytes(data, 0, pos - 1);
count += pos;
m_in.reset();
m_in.skip(pos);
break;
}
prev = b;
}
m_codec.decode(buf, tree.get());
return count;
}
}
}
package com.dianping.cat.job.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class MessageTreeWritable implements Writable {
private MessageTree m_tree = new DefaultMessageTree();
public MessageTree get() {
return m_tree;
}
@Override
public void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException(
"This method should never be called, please check with the author if any problem.");
}
@Override
public void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException(
"This method should never be called, please check with the author if any problem.");
}
}
<plexus>
<components>
<component>
<role>com.dianping.cat.job.hdfs.OutputChannel</role>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannel</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<maxSize>2097152</maxSize>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
<implementation>com.dianping.cat.job.hdfs.HdfsMessageStorage</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dump-to-hdfs</role-hint>
<implementation>com.dianping.cat.job.HdfsDumpConsumer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
package com.dianping.cat.job.hdfs;
import org.junit.After;
import org.junit.Before;
import com.dianping.cat.Cat;
import com.site.lookup.ComponentTestCase;
public abstract class CatTestCase extends ComponentTestCase {
@Before
public void before() throws Exception {
Cat.initialize(getContainer(), null);
Cat.setup(null, null);
}
@After
public void after() throws Exception {
Cat.reset();
Cat.destroy();
}
}
\ No newline at end of file
package com.dianping.cat.job.hdfs;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
@RunWith(JUnit4.class)
public class HdfsMessageStorageTest extends CatTestCase {
@Test
public void test() throws Exception {
MessageStorage storage = lookup(MessageStorage.class, "hdfs");
MessageProducer producer = lookup(MessageProducer.class);
InMemoryQueue queue = lookup(InMemoryQueue.class);
for (int i = 0; i < 10000; i++) {
Transaction t = producer.newTransaction("URL", "MyPage" + (int) (i / 500));
try {
// do your business here
t.addData("k1", "v1");
t.addData("k2", "v2");
t.addData("k3", "v3");
Thread.sleep(1);
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
} finally {
t.complete();
}
MessageTree tree = queue.poll(0);
storage.store(tree);
}
((HdfsMessageStorage) storage).dispose();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.1.0</version>
<name>Tracking</name>
<packaging>pom</packaging>
<modules>
<module>cat-core</module>
<module>cat-consumer</module>
<module>cat-home</module>
<module>cat-hadoop</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-consumer</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-container-default</artifactId>
<version>1.0-alpha-47</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<version>1.0.0-a1</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>web-framework</artifactId>
<version>1.0.0-a3</version>
</dependency>
<dependency>
<groupId>org.unidal.webres</groupId>
<artifactId>WebResServer</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.site.dal</groupId>
<artifactId>dal-jdbc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.site.app</groupId>
<artifactId>app-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.12</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.7.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.9</version>
<configuration>
<includes>
<include>**/AllTests.java</include>
</includes>
<!--
<debugForkedProcess>-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 -Xnoagent -Djava.compiler=NONE</debugForkedProcess>
-->
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>2.9</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<additionalConfig>
<file>
<name>.settings/org.eclipse.jdt.core.prefs</name>
<content><![CDATA[
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.1.0</version>
<name>Tracking</name>
<packaging>pom</packaging>
<modules>
<module>cat-core</module>
<module>cat-consumer</module>
<module>cat-home</module>
<module>cat-job</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-consumer</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-job</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-container-default</artifactId>
<version>1.0-alpha-47</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>test-framework</artifactId>
<version>1.0.0-a1</version>
</dependency>
<dependency>
<groupId>com.site.common</groupId>
<artifactId>web-framework</artifactId>
<version>1.0.0-a3</version>
</dependency>
<dependency>
<groupId>org.unidal.webres</groupId>
<artifactId>WebResServer</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.site.dal</groupId>
<artifactId>dal-jdbc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.site.app</groupId>
<artifactId>app-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
<version>6.1.9</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.12</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.7.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.9</version>
<configuration>
<includes>
<include>**/AllTests.java</include>
</includes>
<!-- <debugForkedProcess>-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 -Xnoagent -Djava.compiler=NONE</debugForkedProcess> -->
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>2.9</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<additionalConfig>
<file>
<name>.settings/org.eclipse.jdt.core.prefs</name>
<content><![CDATA[
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.source=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
]]></content>
</file>
<file>
<name>.settings/org.eclipse.core.resources.prefs</name>
<content>
</file>
<file>
<name>.settings/org.eclipse.core.resources.prefs</name>
<content>
<![CDATA[
eclipse.preferences.version=1
encoding/<project>=UTF-8
]]>
</content>
</file>
</additionalConfig>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>releases</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<properties>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
</properties>
</content>
</file>
</additionalConfig>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1.2</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>package</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>releases</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<url>http://tech-wuqim.dianpingoa.com/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<properties>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
</properties>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册