提交 270eb4e0 编写于 作者: F Frankie Wu

Merge branch 'master' of ssh://192.168.8.22:58422/cat

package com.dianping.cat;
public class CatConstants {
/**
* Config File
*/
public static final String CONFIG_FILE = "/data/appdatas/cat/client.xml";
/**
* Pigeon Transation Type
*/
public static final String TYPE_CALL = "Call";
public static final String TYPE_RESULT = "Result";
public static final String TYPE_TimeOut = "PigeonTimeOut";
public static final String TYPE_SERVICE = "Service";
public static final String TYPE_REMOTE_CALL = "RemoteCall";
/**
* Pigeon Event name
*/
public static final String NAME_REQUEST = "PigeonRequest";
public static final String NAME_RESPONSE = "PigeonRespone";
public static final String NAME_PAYLOAD = "Payload";
public static final String NAME_TIME_OUT = "ClientTimeOut";
/**
* Pigeon Context Info
*/
public static final String PIGEON_ROOT_MESSAGE_ID = "RootMessageId";
public static final String PIGEON_CURRENT_MESSAGE_ID = "CurrentMessageId";
public static final String PIGEON_SERVER_MESSAGE_ID = "ServerMessageId";
public static final String PIGEON_RESPONSE_MESSAGE_ID = "ResponseMessageId";
/**
* Zebra
*/
public static final String TYPE_SQL = "SQL";
public static final String TYPE_SQL_PARAM = "SQL.PARAM";
/**
* Avatar
*/
public static final String TYPE_URL = "URL";
public static final String NAME_CLIENT_INFO = "ClientInfo";
public static final String TYPE_ACTION = "Action";
}
\ No newline at end of file
package com.dianping.cat.message.spi; package com.dianping.cat.message.spi;
/**
* @author franke.wu
* @author sean.wang
* @since Mar 6, 2012
*/
public interface MessageStorage { public interface MessageStorage {
/** /**
* Store a message tree to the storage. * Store a message tree to the storage.
...@@ -11,6 +16,15 @@ public interface MessageStorage { ...@@ -11,6 +16,15 @@ public interface MessageStorage {
public String store(MessageTree tree); public String store(MessageTree tree);
/** /**
* Fetch a message tree from the store.
*
* @param messageId
* @return
*/
public MessageTree get(String messageId);
/**
*
* Get relative path to base directory or base URL. * Get relative path to base directory or base URL.
* *
* @param tree * @param tree
......
...@@ -139,4 +139,10 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis ...@@ -139,4 +139,10 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis
m_active = false; m_active = false;
} }
} }
@Override
public MessageTree get(String messageId) {
// TODO Auto-generated method stub
return null;
}
} }
...@@ -11,6 +11,7 @@ import javax.servlet.ServletResponse; ...@@ -11,6 +11,7 @@ import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.CatConstants;
import com.dianping.cat.message.Event; import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message; import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.MessageProducer;
...@@ -29,7 +30,7 @@ public abstract class CatFilter implements Filter { ...@@ -29,7 +30,7 @@ public abstract class CatFilter implements Filter {
// setup for thread local data // setup for thread local data
Cat.setup(sessionToken); Cat.setup(sessionToken);
MessageProducer cat = Cat.getProducer(); MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("URL", req.getRequestURI()); Transaction t = cat.newTransaction(CatConstants.TYPE_URL, req.getRequestURI());
t.setStatus(Transaction.SUCCESS); t.setStatus(Transaction.SUCCESS);
logRequestClientInfo(cat, req); logRequestClientInfo(cat, req);
...@@ -81,6 +82,6 @@ public abstract class CatFilter implements Filter { ...@@ -81,6 +82,6 @@ public abstract class CatFilter implements Filter {
sb.append('?').append(qs); sb.append('?').append(qs);
} }
cat.logEvent("URL", "Payload", Event.SUCCESS, sb.toString()); cat.logEvent(CatConstants.TYPE_URL, CatConstants.NAME_PAYLOAD, Event.SUCCESS, sb.toString());
} }
} }
Manifest-Version: 1.0
Class-Path:
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.InputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.annotation.Inject;
public class DefaultInputChannel implements InputChannel {
@Inject
private MessageCodec m_codec;
private InputStream m_in;
@Override
public void close() {
if (m_in != null) {
try {
m_in.close();
m_in = null;
} catch (IOException e) {
// ignore it
}
}
}
@Override
public void initialize(InputStream in) {
m_in = in;
}
@Override
public MessageTree read(int index, int length) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
buf.writeBytes(m_in, length);
MessageTree tree = new DefaultMessageTree();
m_codec.decode(buf, tree);
return tree;
}
@Override
public boolean isExpired() {
// TODO Auto-generated method stub
return false;
}
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultInputChannelManager extends ContainerHolder implements InputChannelManager, Initializable, LogEnabled {
@Inject
private MessagePathBuilder m_builder;
@Inject
private String m_baseDir = "target/hdfs";
@Inject
private URI m_serverUri;
private FileSystem m_fs;
private Path m_basePath;
private String m_ipAddress;
private Map<String, InputChannel> m_channels = new HashMap<String, InputChannel>();
private Logger m_logger;
@Override
public void cleanupChannels() {
try {
List<String> expired = new ArrayList<String>();
for (Map.Entry<String, InputChannel> e : m_channels.entrySet()) {
if (e.getValue().isExpired()) {
expired.add(e.getKey());
}
}
for (String path : expired) {
InputChannel channel = m_channels.remove(path);
closeChannel(channel);
}
} catch (Exception e) {
m_logger.warn("Error when doing cleanup!", e);
}
}
@Override
public void closeAllChannels() {
for (InputChannel channel : m_channels.values()) {
closeChannel(channel);
}
}
@Override
public void closeChannel(InputChannel channel) {
channel.close();
super.release(channel);
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
try {
Configuration config = new Configuration();
FileSystem fs;
config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) {
fs = FileSystem.getLocal(config);
} else {
fs = FileSystem.get(m_serverUri, config); // TODO Not tested yet
}
m_fs = fs;
m_basePath = new Path(m_fs.getWorkingDirectory(), m_baseDir);
} catch (Exception e) {
throw new InitializationException("Error when getting HDFS file system.", e);
}
try {
InetAddress localHost = InetAddress.getLocalHost();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
m_logger.warn("Unable to get local host!", e);
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
public void setServerUri(String serverUri) {
m_serverUri = URI.create(serverUri);
}
@Override
public InputChannel openChannel(String messageId) throws IOException {
String path = m_builder.getHdfsPath(messageId);
InputChannel channel = m_channels.get(path);
if (channel == null) {
Path file = new Path(m_basePath, path + "-0");
FSDataInputStream in = m_fs.open(file);
channel = lookup(InputChannel.class);
channel.initialize(in);
m_channels.put(path, channel);
}
return channel;
}
}
...@@ -80,4 +80,5 @@ public class DefaultOutputChannel implements OutputChannel { ...@@ -80,4 +80,5 @@ public class DefaultOutputChannel implements OutputChannel {
return true; return true;
} }
} }
...@@ -144,4 +144,10 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos ...@@ -144,4 +144,10 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos
} }
} }
} }
@Override
public MessageTree get(String messageId) {
// TODO Auto-generated method stub
return null;
}
} }
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.InputStream;
import com.dianping.cat.message.spi.MessageTree;
public interface InputChannel {
/**
* Close the channel.
*/
public void close();
/**
* Initialize the channel with an output stream.
*
* @param out
*/
public void initialize(InputStream in);
/**
* Check if the channel is expired.
*
* @return true if the channel is expired, false otherwise.
*/
public boolean isExpired();
/**
* Fetch message tree from hdfs.
*
* @param index
* @param length
* @return
* @throws IOException
*/
public MessageTree read(int index, int length) throws IOException;
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
public interface InputChannelManager {
public void cleanupChannels();
public void closeAllChannels();
public void closeChannel(InputChannel channel);
public InputChannel openChannel(String messageId) throws IOException;
}
...@@ -33,4 +33,5 @@ public interface OutputChannel { ...@@ -33,4 +33,5 @@ public interface OutputChannel {
* @throws IOException * @throws IOException
*/ */
public boolean write(MessageTree tree) throws IOException; public boolean write(MessageTree tree) throws IOException;
} }
...@@ -8,9 +8,10 @@ import org.apache.hadoop.mapreduce.InputSplit; ...@@ -8,9 +8,10 @@ import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class MessageTreeInputFormat extends FileInputFormat<LongWritable, MessageTreeWritable> { import com.dianping.cat.job.sql.MutiFileInputFormat;
public class MessageTreeInputFormat extends MutiFileInputFormat<LongWritable, MessageTreeWritable> {
@Override @Override
public RecordReader<LongWritable, MessageTreeWritable> createRecordReader(InputSplit split, public RecordReader<LongWritable, MessageTreeWritable> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException { TaskAttemptContext context) throws IOException, InterruptedException {
...@@ -19,7 +20,6 @@ public class MessageTreeInputFormat extends FileInputFormat<LongWritable, Messag ...@@ -19,7 +20,6 @@ public class MessageTreeInputFormat extends FileInputFormat<LongWritable, Messag
@Override @Override
protected boolean isSplitable(JobContext context, Path filename) { protected boolean isSplitable(JobContext context, Path filename) {
// the file is already small enough, so do not need to split it
return false; return false;
} }
} }
package com.dianping.cat.job.sql;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.security.TokenCache;
public abstract class MutiFileInputFormat<K,V> extends FileInputFormat<K, V> {
public List<FileStatus> listStatus(JobContext job) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration());
List<IOException> errors = new ArrayList<IOException>();
List<PathFilter> filters = new ArrayList<PathFilter>();
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
for (int i = 0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat : matches) {
addFileStat(result, inputFilter, fs, globStat);
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
public void addFileStat(List<FileStatus> result, PathFilter inputFilter, FileSystem fs, FileStatus globStat)
throws IOException {
if (globStat.isDir()) {
for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) {
addFileStat(result,inputFilter,fs,stat);
}
} else {
result.add(globStat);
System.out.println(globStat.getPath().getName());
}
}
private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
}
}
package com.dianping.cat.job.sql;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.dianping.cat.job.mapreduce.MessageTreeInputFormat;
import com.site.helper.Files;
public class SqlJobMain extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new SqlJobMain(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "Sql Analyzer");
job.setJarByClass(SqlJobMain.class);
job.setMapperClass(SqlJobMapper.class);
job.setReducerClass(SqlJobReducer.class);
job.setInputFormatClass(MessageTreeInputFormat.class);
job.setOutputKeyClass(SqlStatementKey.class);
job.setOutputValueClass(SqlJobResult.class);
job.setPartitionerClass(SqlJobPatitioner.class);
job.setMapOutputKeyClass(SqlStatementKey.class);
job.setMapOutputValueClass(SqlStatementValue.class);
job.setNumReduceTasks(2);
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd/HH/");
String dateStr = sdf.format(new Date());
String path = "target/hdfs/20120306/23/null/";
FileInputFormat.addInputPath(job, new Path(path));
FileOutputFormat.setOutputPath(job, new Path("target/sql"));
Files.forDir().delete(new File("target/sql"), true);
return job.waitForCompletion(true) ? 0 : 1;
}
}
package com.dianping.cat.job.sql;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.dianping.cat.job.mapreduce.MessageTreeWritable;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageTree;
public class SqlJobMapper extends Mapper<Object, MessageTreeWritable, SqlStatementKey, SqlStatementValue> {
public static final String DEFAULT_DOMAIN = "NoDomain";
private void handle(Context context, MessageTree tree) throws IOException, InterruptedException {
Message message = tree.getMessage();
String domain = tree.getDomain();
if (domain == null || domain.length() == 0) {
domain = DEFAULT_DOMAIN;
}
if (message instanceof Transaction) {
Transaction transaction = (Transaction) message;
processTransaction(context, transaction, tree, domain);
}
}
public void map(Object key, MessageTreeWritable value, Context context) throws IOException, InterruptedException {
MessageTree message = value.get();
handle(context, message);
}
private void processTransaction(Context context, Transaction transaction, MessageTree tree, String domain)
throws IOException, InterruptedException {
String type = transaction.getType();
if (type.equals("SQL")) {
SqlStatementKey statementKey = new SqlStatementKey();
String statement = transaction.getName();
long duration = transaction.getDuration();
int flag = 0;
statementKey.setDomain(new Text(domain)).setStatement(new Text(statement));
if (!transaction.getStatus().equals(Transaction.SUCCESS)) {
flag = 1;
}
SqlStatementValue result = new SqlStatementValue(flag, duration);
context.write(statementKey, result);
}
List<Message> messageList = transaction.getChildren();
for (Message message : messageList) {
if (message instanceof Transaction) {
Transaction temp = (Transaction) message;
processTransaction(context, temp, tree, domain);
}
}
}
}
\ No newline at end of file
package com.dianping.cat.job.sql;
import org.apache.hadoop.mapreduce.Partitioner;
public class SqlJobPatitioner extends Partitioner<SqlStatementKey, SqlStatementValue> {
@Override
public int getPartition(SqlStatementKey key, SqlStatementValue value, int numPartitions) {
return key.getDomain().hashCode() % numPartitions;
}
}
package com.dianping.cat.job.sql;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
public class SqlJobReducer extends Reducer<SqlStatementKey, SqlStatementValue, SqlStatementKey, SqlJobResult> {
public void reduce(SqlStatementKey key, Iterable<SqlStatementValue> values, Context context) throws IOException,
InterruptedException {
SqlJobResult result = new SqlJobResult();
for (SqlStatementValue val : values) {
result.add(val.getValue(), val.getFlag());
}
context.write(key, result);
}
}
package com.dianping.cat.job.sql;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class SqlJobResult implements Writable {
private static final double LONG_TIME = 50;
private int m_count;
private double m_sum;
private double m_sum2;
private int m_successCount;
private int m_failureCount;
private int m_longTimeCount;
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Count: ").append(m_count).append("\t").append("Sum: ").append(m_sum).append("\t")//
.append("Sum2: ").append(m_sum2).append("\t").append("Std: ").append(getStd());
sb.append("\t").append("Success: ").append(m_successCount);
sb.append("\t").append("Failure: ").append(m_failureCount).append("\t").append("Long: ").append(m_longTimeCount);
return sb.toString();
}
public void add(double value, int flag) {
m_count++;
m_sum += value;
m_sum2 = m_sum2 + value * value;
if (flag == 1) {
m_failureCount++;
}else{
m_successCount++;
}
if(value>LONG_TIME){
m_longTimeCount++;
}
}
public double getAvg() {
if (m_count == 0) {
return 0;
}
return m_sum / m_count;
}
private double getStd() {
double ave = getAvg();
return Math.sqrt(m_sum2 / m_count - 2 * ave * ave + ave * ave);
}
@Override
public void readFields(DataInput arg0) throws IOException {
throw new UnsupportedOperationException(
"This method should never be called, please check with the author if any problem.");
}
@Override
public void write(DataOutput arg0) throws IOException {
throw new UnsupportedOperationException(
"This method should never be called, please check with the author if any problem.");
}
}
\ No newline at end of file
package com.dianping.cat.job.sql;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class SqlStatementKey implements WritableComparable<SqlStatementKey> {
private Text m_statement;
private Text m_domain;
public SqlStatementKey() {
m_statement = new Text();
m_domain = new Text();
}
public Text getDomain() {
return m_domain;
}
public SqlStatementKey setDomain(Text domain) {
this.m_domain = domain;
return this;
}
public Text getStatement() {
return m_statement;
}
public SqlStatementKey setStatement(Text statement) {
this.m_statement = statement;
return this;
}
@Override
public void write(DataOutput out) throws IOException {
m_domain.write(out);
m_statement.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
m_domain.readFields(in);
m_statement.readFields(in);
}
@Override
public int compareTo(SqlStatementKey key) {
if (m_domain.toString().equals(key.getDomain().toString())) {
if (m_statement.toString().equals(key.getStatement().toString())) {
return 0;
} else {
return m_statement.compareTo(key.getStatement());
}
}
return m_domain.compareTo(key.getDomain());
}
@Override
public String toString() {
return String.format("[domain:%s statement:%s]", m_domain, m_statement);
}
}
package com.dianping.cat.job.sql;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class SqlStatementValue implements Writable {
public int m_flag;
public double m_value;
public SqlStatementValue(){
}
public int getFlag() {
return m_flag;
}
public double getValue() {
return m_value;
}
public SqlStatementValue(int flag, double value) {
m_flag = flag;
m_value = value;
}
@Override
public void readFields(DataInput input) throws IOException {
m_flag = input.readInt();
m_value = input.readDouble();
}
@Override
public void write(DataOutput output) throws IOException {
output.writeInt(m_flag);
output.writeDouble(m_value);
}
}
package com.dianping.cat.job.hdfs; package com.dianping.cat.job.hdfs;
import junit.framework.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction; import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.InMemoryQueue; import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageStorage; import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class HdfsMessageStorageTest extends CatTestCase { public class HdfsMessageStorageTest extends CatTestCase {
private DefaultMessageTree newMessageTree(String id) {
DefaultMessageTree tree = new DefaultMessageTree();
tree.setDomain("domain");
tree.setHostName("hostName");
tree.setIpAddress("ipAddress");
tree.setMessageId(id);
tree.setParentMessageId("parentMessageId");
tree.setRootMessageId("rootMessageId");
tree.setSessionToken("sessionToken");
tree.setThreadId("threadId");
tree.setThreadName("threadName");
return tree;
}
@Test @Test
public void test() throws Exception { public void test() throws Exception {
MessageStorage storage = lookup(MessageStorage.class, "hdfs"); MessageStorage storage = lookup(MessageStorage.class, "hdfs");
MessageProducer producer = lookup(MessageProducer.class); MessageManager manager = lookup(MessageManager.class);
InMemoryQueue queue = lookup(InMemoryQueue.class); MessageTree tree = newMessageTree("abcdef");
Transaction t = new DefaultTransaction("t", "n", manager);
for (int i = 0; i < 10000; i++) { tree.setMessage(t);
Transaction t = producer.newTransaction("URL", "MyPage" + (int) (i / 500));
try {
// do your business here
t.addData("k1", "v1");
t.addData("k2", "v2");
t.addData("k3", "v3");
Thread.sleep(1);
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
} finally {
t.complete();
}
MessageTree tree = queue.poll(0);
storage.store(tree); storage.store(tree);
} MessageTree actual = storage.get(tree.getMessageId());
Assert.assertTrue(tree.equals(actual));
((HdfsMessageStorage) storage).dispose(); ((HdfsMessageStorage) storage).dispose();
} }
} }
package com.dianping.cat.job.sql;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.job.hdfs.CatTestCase;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.helper.Stringizers;
@RunWith(JUnit4.class)
public class SqlJobDataProduceTest extends CatTestCase {
@Test
public void test() throws Exception {
MessageStorage storage = lookup(MessageStorage.class, "hdfs");
MessageProducer producer = lookup(MessageProducer.class);
InMemoryQueue queue = lookup(InMemoryQueue.class);
for (int i = 0; i < 10000; i++) {
Transaction t = producer.newTransaction("URL", "MyPage" + (int) (i / 500));
try {
// do your business here
t.addData("k1", "v1");
t.addData("k2", "v2");
t.addData("k3", "v3");
Thread.sleep(1);
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
String sqlStatement = "SQLStatement" + i / 500;
String sqlParaMeter = "SqlParaMeters";
Transaction sqlTran = producer.newTransaction("SQL", sqlStatement);
producer.logEvent("SQL.PARAM", sqlStatement, Transaction.SUCCESS,
Stringizers.forJson().compact().from(sqlParaMeter));
sqlTran.addData(sqlStatement + "detail...");
sqlTran.complete();
if (i % 2 == 1) {
sqlTran.setStatus(Message.SUCCESS);
} else {
sqlTran.setStatus("Error");
}
DefaultTransaction def = (DefaultTransaction) sqlTran;
def.setDuration(i % 100);
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
} finally {
t.complete();
}
MessageTree tree = queue.poll(0);
//tree.setDomain("Domain" + i % 3);
storage.store(tree);
}
((HdfsMessageStorage) storage).dispose();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册