From e16944fbbe6a70f5274a0f951addd88d5aa7066f Mon Sep 17 00:00:00 2001 From: youyong Date: Wed, 7 Mar 2012 13:31:05 +0800 Subject: [PATCH] add the hadoop job --- .../src/main/java/META-INF/MANIFEST.MF | 3 + .../java/com/dianping/cat/CatConstants.java | 62 +++++++++++++ .../com/dianping/cat/servlet/CatFilter.java | 5 +- cat-job/src/main/java/META-INF/MANIFEST.MF | 3 + .../job/mapreduce/MessageTreeInputFormat.java | 6 +- .../cat/job/sql/MutiFileInputFormat.java | 88 +++++++++++++++++++ .../com/dianping/cat/job/sql/SqlJobMain.java | 54 ++++++++++++ .../dianping/cat/job/sql/SqlJobMapper.java | 65 ++++++++++++++ .../cat/job/sql/SqlJobPatitioner.java | 11 +++ .../dianping/cat/job/sql/SqlJobReducer.java | 17 ++++ .../dianping/cat/job/sql/SqlJobResult.java | 72 +++++++++++++++ .../dianping/cat/job/sql/SqlStatementKey.java | 66 ++++++++++++++ .../cat/job/sql/SqlStatementValue.java | 44 ++++++++++ .../cat/job/sql/SqlJobDataProduceTest.java | 77 ++++++++++++++++ 14 files changed, 568 insertions(+), 5 deletions(-) create mode 100644 cat-consumer/src/main/java/META-INF/MANIFEST.MF create mode 100644 cat-core/src/main/java/com/dianping/cat/CatConstants.java create mode 100644 cat-job/src/main/java/META-INF/MANIFEST.MF create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/MutiFileInputFormat.java create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobPatitioner.java create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobReducer.java create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java create mode 100644 cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java create mode 100644 cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java diff --git a/cat-consumer/src/main/java/META-INF/MANIFEST.MF b/cat-consumer/src/main/java/META-INF/MANIFEST.MF new file mode 100644 index 000000000..254272e1c --- /dev/null +++ b/cat-consumer/src/main/java/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Class-Path: + diff --git a/cat-core/src/main/java/com/dianping/cat/CatConstants.java b/cat-core/src/main/java/com/dianping/cat/CatConstants.java new file mode 100644 index 000000000..4b79fac56 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/CatConstants.java @@ -0,0 +1,62 @@ +package com.dianping.cat; + +public class CatConstants { + + /** + * Config File + */ + public static final String CONFIG_FILE = "/data/appdatas/cat/client.xml"; + + /** + * Pigeon Transation Type + */ + public static final String TYPE_CALL = "Call"; + + public static final String TYPE_RESULT = "Result"; + + public static final String TYPE_TimeOut = "PigeonTimeOut"; + + public static final String TYPE_SERVICE = "Service"; + + public static final String TYPE_REMOTE_CALL = "RemoteCall"; + + /** + * Pigeon Event name + */ + public static final String NAME_REQUEST = "PigeonRequest"; + + public static final String NAME_RESPONSE = "PigeonRespone"; + + public static final String NAME_PAYLOAD = "Payload"; + + public static final String NAME_TIME_OUT = "ClientTimeOut"; + + + /** + * Pigeon Context Info + */ + public static final String PIGEON_ROOT_MESSAGE_ID = "RootMessageId"; + + public static final String PIGEON_CURRENT_MESSAGE_ID = "CurrentMessageId"; + + public static final String PIGEON_SERVER_MESSAGE_ID = "ServerMessageId"; + + public static final String PIGEON_RESPONSE_MESSAGE_ID = "ResponseMessageId"; + + /** + * Zebra + */ + public static final String TYPE_SQL = "SQL"; + + public static final String TYPE_SQL_PARAM = "SQL.PARAM"; + + /** + * Avatar + */ + public static final String TYPE_URL = "URL"; + + public static final String NAME_CLIENT_INFO = "ClientInfo"; + + public static final String TYPE_ACTION = "Action"; + +} \ No newline at end of file diff --git a/cat-core/src/main/java/com/dianping/cat/servlet/CatFilter.java b/cat-core/src/main/java/com/dianping/cat/servlet/CatFilter.java index e7579a3da..bad4b5e0f 100644 --- a/cat-core/src/main/java/com/dianping/cat/servlet/CatFilter.java +++ b/cat-core/src/main/java/com/dianping/cat/servlet/CatFilter.java @@ -11,6 +11,7 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import com.dianping.cat.Cat; +import com.dianping.cat.CatConstants; import com.dianping.cat.message.Event; import com.dianping.cat.message.Message; import com.dianping.cat.message.MessageProducer; @@ -29,7 +30,7 @@ public abstract class CatFilter implements Filter { // setup for thread local data Cat.setup(sessionToken); MessageProducer cat = Cat.getProducer(); - Transaction t = cat.newTransaction("URL", req.getRequestURI()); + Transaction t = cat.newTransaction(CatConstants.TYPE_URL, req.getRequestURI()); t.setStatus(Transaction.SUCCESS); logRequestClientInfo(cat, req); @@ -81,6 +82,6 @@ public abstract class CatFilter implements Filter { sb.append('?').append(qs); } - cat.logEvent("URL", "Payload", Event.SUCCESS, sb.toString()); + cat.logEvent(CatConstants.TYPE_URL, CatConstants.NAME_PAYLOAD, Event.SUCCESS, sb.toString()); } } diff --git a/cat-job/src/main/java/META-INF/MANIFEST.MF b/cat-job/src/main/java/META-INF/MANIFEST.MF new file mode 100644 index 000000000..254272e1c --- /dev/null +++ b/cat-job/src/main/java/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Class-Path: + diff --git a/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java b/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java index 900326750..9321ab2e3 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java +++ b/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java @@ -8,9 +8,10 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -public class MessageTreeInputFormat extends FileInputFormat { +import com.dianping.cat.job.sql.MutiFileInputFormat; + +public class MessageTreeInputFormat extends MutiFileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { @@ -19,7 +20,6 @@ public class MessageTreeInputFormat extends FileInputFormat extends FileInputFormat { + + public List listStatus(JobContext job) throws IOException { + + List result = new ArrayList(); + Path[] dirs = getInputPaths(job); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + + TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); + + List errors = new ArrayList(); + List filters = new ArrayList(); + PathFilter jobFilter = getInputPathFilter(job); + if (jobFilter != null) { + filters.add(jobFilter); + } + PathFilter inputFilter = new MultiPathFilter(filters); + + for (int i = 0; i < dirs.length; ++i) { + Path p = dirs[i]; + FileSystem fs = p.getFileSystem(job.getConfiguration()); + FileStatus[] matches = fs.globStatus(p, inputFilter); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + p)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + p + " matches 0 files")); + } else { + + for (FileStatus globStat : matches) { + addFileStat(result, inputFilter, fs, globStat); + } + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + return result; + } + + public void addFileStat(List result, PathFilter inputFilter, FileSystem fs, FileStatus globStat) + throws IOException { + if (globStat.isDir()) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) { + addFileStat(result,inputFilter,fs,stat); + } + } else { + result.add(globStat); + + System.out.println(globStat.getPath().getName()); + } + } + + private static class MultiPathFilter implements PathFilter { + private List filters; + + public MultiPathFilter(List filters) { + this.filters = filters; + } + + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java new file mode 100644 index 000000000..9b6e27ebc --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java @@ -0,0 +1,54 @@ +package com.dianping.cat.job.sql; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.dianping.cat.job.mapreduce.MessageTreeInputFormat; +import com.site.helper.Files; + +public class SqlJobMain extends Configured implements Tool { + + public static void main(String[] args) throws Exception { + int exitCode = ToolRunner.run(new Configuration(), new SqlJobMain(), args); + + System.exit(exitCode); + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + + Job job = new Job(conf, "Sql Analyzer"); + job.setJarByClass(SqlJobMain.class); + job.setMapperClass(SqlJobMapper.class); + job.setReducerClass(SqlJobReducer.class); + job.setInputFormatClass(MessageTreeInputFormat.class); + job.setOutputKeyClass(SqlStatementKey.class); + job.setOutputValueClass(SqlJobResult.class); + job.setPartitionerClass(SqlJobPatitioner.class); + + job.setMapOutputKeyClass(SqlStatementKey.class); + job.setMapOutputValueClass(SqlStatementValue.class); + job.setNumReduceTasks(2); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd/HH/"); + String dateStr = sdf.format(new Date()); + String path = "target/hdfs/20120306/23/null/"; + + FileInputFormat.addInputPath(job, new Path(path)); + FileOutputFormat.setOutputPath(job, new Path("target/sql")); + Files.forDir().delete(new File("target/sql"), true); + + return job.waitForCompletion(true) ? 0 : 1; + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java new file mode 100644 index 000000000..e85dd32a1 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java @@ -0,0 +1,65 @@ +package com.dianping.cat.job.sql; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import com.dianping.cat.job.mapreduce.MessageTreeWritable; +import com.dianping.cat.message.Message; +import com.dianping.cat.message.Transaction; +import com.dianping.cat.message.spi.MessageTree; + +public class SqlJobMapper extends Mapper { + + public static final String DEFAULT_DOMAIN = "NoDomain"; + + private void handle(Context context, MessageTree tree) throws IOException, InterruptedException { + Message message = tree.getMessage(); + String domain = tree.getDomain(); + if (domain == null || domain.length() == 0) { + domain = DEFAULT_DOMAIN; + } + + if (message instanceof Transaction) { + Transaction transaction = (Transaction) message; + + processTransaction(context, transaction, tree, domain); + } + } + + public void map(Object key, MessageTreeWritable value, Context context) throws IOException, InterruptedException { + MessageTree message = value.get(); + handle(context, message); + } + + private void processTransaction(Context context, Transaction transaction, MessageTree tree, String domain) + throws IOException, InterruptedException { + String type = transaction.getType(); + + if (type.equals("SQL")) { + SqlStatementKey statementKey = new SqlStatementKey(); + String statement = transaction.getName(); + long duration = transaction.getDuration(); + int flag = 0; + + statementKey.setDomain(new Text(domain)).setStatement(new Text(statement)); + if (!transaction.getStatus().equals(Transaction.SUCCESS)) { + flag = 1; + } + SqlStatementValue result = new SqlStatementValue(flag, duration); + context.write(statementKey, result); + } + + List messageList = transaction.getChildren(); + + for (Message message : messageList) { + if (message instanceof Transaction) { + Transaction temp = (Transaction) message; + + processTransaction(context, temp, tree, domain); + } + } + } +} \ No newline at end of file diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobPatitioner.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobPatitioner.java new file mode 100644 index 000000000..e42ba9cb7 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobPatitioner.java @@ -0,0 +1,11 @@ +package com.dianping.cat.job.sql; + +import org.apache.hadoop.mapreduce.Partitioner; + +public class SqlJobPatitioner extends Partitioner { + + @Override + public int getPartition(SqlStatementKey key, SqlStatementValue value, int numPartitions) { + return key.getDomain().hashCode() % numPartitions; + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobReducer.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobReducer.java new file mode 100644 index 000000000..7048f12c7 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobReducer.java @@ -0,0 +1,17 @@ +package com.dianping.cat.job.sql; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.Reducer; + +public class SqlJobReducer extends Reducer { + + public void reduce(SqlStatementKey key, Iterable values, Context context) throws IOException, + InterruptedException { + SqlJobResult result = new SqlJobResult(); + + for (SqlStatementValue val : values) { + result.add(val.getValue(), val.getFlag()); + } + context.write(key, result); + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java new file mode 100644 index 000000000..9528eaca5 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java @@ -0,0 +1,72 @@ +package com.dianping.cat.job.sql; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class SqlJobResult implements Writable { + + private static final double LONG_TIME = 50; + + private int m_count; + + private double m_sum; + + private double m_sum2; + + private int m_successCount; + + private int m_failureCount; + + private int m_longTimeCount; + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("Count: ").append(m_count).append("\t").append("Sum: ").append(m_sum).append("\t")// + .append("Sum2: ").append(m_sum2).append("\t").append("Std: ").append(getStd()); + sb.append("\t").append("Success: ").append(m_successCount); + sb.append("\t").append("Failure: ").append(m_failureCount).append("\t").append("Long: ").append(m_longTimeCount); + return sb.toString(); + } + + public void add(double value, int flag) { + m_count++; + m_sum += value; + m_sum2 = m_sum2 + value * value; + if (flag == 1) { + m_failureCount++; + }else{ + m_successCount++; + } + if(value>LONG_TIME){ + m_longTimeCount++; + } + } + + public double getAvg() { + if (m_count == 0) { + return 0; + } + return m_sum / m_count; + } + + private double getStd() { + double ave = getAvg(); + return Math.sqrt(m_sum2 / m_count - 2 * ave * ave + ave * ave); + } + + @Override + public void readFields(DataInput arg0) throws IOException { + throw new UnsupportedOperationException( + "This method should never be called, please check with the author if any problem."); + } + + @Override + public void write(DataOutput arg0) throws IOException { + throw new UnsupportedOperationException( + "This method should never be called, please check with the author if any problem."); + } + +} \ No newline at end of file diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java new file mode 100644 index 000000000..29bdfb05e --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java @@ -0,0 +1,66 @@ +package com.dianping.cat.job.sql; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +public class SqlStatementKey implements WritableComparable { + private Text m_statement; + + private Text m_domain; + + public SqlStatementKey() { + m_statement = new Text(); + m_domain = new Text(); + } + + public Text getDomain() { + return m_domain; + } + + public SqlStatementKey setDomain(Text domain) { + this.m_domain = domain; + return this; + } + + public Text getStatement() { + return m_statement; + } + + public SqlStatementKey setStatement(Text statement) { + this.m_statement = statement; + return this; + } + + @Override + public void write(DataOutput out) throws IOException { + m_domain.write(out); + m_statement.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + m_domain.readFields(in); + m_statement.readFields(in); + } + + @Override + public int compareTo(SqlStatementKey key) { + if (m_domain.toString().equals(key.getDomain().toString())) { + if (m_statement.toString().equals(key.getStatement().toString())) { + return 0; + } else { + return m_statement.compareTo(key.getStatement()); + } + } + return m_domain.compareTo(key.getDomain()); + } + + @Override + public String toString() { + return String.format("[domain:%s statement:%s]", m_domain, m_statement); + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java new file mode 100644 index 000000000..bc81432de --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java @@ -0,0 +1,44 @@ +package com.dianping.cat.job.sql; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class SqlStatementValue implements Writable { + + public int m_flag; + + public double m_value; + + public SqlStatementValue(){ + + } + + public int getFlag() { + return m_flag; + } + + public double getValue() { + return m_value; + } + + public SqlStatementValue(int flag, double value) { + m_flag = flag; + m_value = value; + } + + @Override + public void readFields(DataInput input) throws IOException { + m_flag = input.readInt(); + m_value = input.readDouble(); + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeInt(m_flag); + output.writeDouble(m_value); + } + +} diff --git a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java new file mode 100644 index 000000000..050433baa --- /dev/null +++ b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java @@ -0,0 +1,77 @@ +package com.dianping.cat.job.sql; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import com.dianping.cat.job.hdfs.CatTestCase; +import com.dianping.cat.job.hdfs.HdfsMessageStorage; +import com.dianping.cat.message.Message; +import com.dianping.cat.message.MessageProducer; +import com.dianping.cat.message.Transaction; +import com.dianping.cat.message.internal.DefaultTransaction; +import com.dianping.cat.message.io.InMemoryQueue; +import com.dianping.cat.message.spi.MessageStorage; +import com.dianping.cat.message.spi.MessageTree; +import com.site.helper.Stringizers; + +@RunWith(JUnit4.class) +public class SqlJobDataProduceTest extends CatTestCase { + @Test + public void test() throws Exception { + MessageStorage storage = lookup(MessageStorage.class, "hdfs"); + MessageProducer producer = lookup(MessageProducer.class); + InMemoryQueue queue = lookup(InMemoryQueue.class); + + for (int i = 0; i < 10000; i++) { + Transaction t = producer.newTransaction("URL", "MyPage" + (int) (i / 500)); + + try { + // do your business here + t.addData("k1", "v1"); + t.addData("k2", "v2"); + t.addData("k3", "v3"); + + Thread.sleep(1); + + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + + String sqlStatement = "SQLStatement" + i / 500; + String sqlParaMeter = "SqlParaMeters"; + Transaction sqlTran = producer.newTransaction("SQL", sqlStatement); + + producer.logEvent("SQL.PARAM", sqlStatement, Transaction.SUCCESS, + Stringizers.forJson().compact().from(sqlParaMeter)); + sqlTran.addData(sqlStatement + "detail..."); + + sqlTran.complete(); + + if (i % 2 == 1) { + sqlTran.setStatus(Message.SUCCESS); + } else { + sqlTran.setStatus("Error"); + } + + DefaultTransaction def = (DefaultTransaction) sqlTran; + def.setDuration(i % 100); + + t.setStatus(Message.SUCCESS); + } catch (Exception e) { + t.setStatus(e); + } finally { + t.complete(); + } + + MessageTree tree = queue.poll(0); + + + //tree.setDomain("Domain" + i % 3); + storage.store(tree); + } + + ((HdfsMessageStorage) storage).dispose(); + } +} -- GitLab