diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java index a05a34e194c6300e5a42e7e420d98afa95a5951f..d78c21bd97b9485d7e9ff29d526da74f2d3d7059 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/transaction/TransactionAnalyzer.java @@ -181,7 +181,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer extends TagThreadSupport { - public void close(); + public void close() throws IOException; - public void deleteAndCreate(); - - public List findAllByIds(List ids); + public void deleteAndCreate() throws IOException; - public T findById(String id); + public List findAllByIds(List ids) throws IOException;; - public void initialize(Class type, File path) throws IOException; + public T findById(String id) throws IOException;; - public boolean storeById(String id, T data); + public void initialize(Class type, File baseDir, String logicalPath) throws IOException; + + public boolean storeById(String id, T data) throws IOException;; } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java b/cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java index 4d4f0566c7fed8a135a6c85bb55c2ccfaec0bbe1..235918ef464ce26ebd69ce5c7b4e8aaff43f83de 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/TagThreadSupport.java @@ -1,5 +1,6 @@ package com.dianping.cat.storage; +import java.io.IOException; import java.util.List; /** @@ -9,11 +10,11 @@ import java.util.List; * Sample tags: "thread:101", "session:abc", "request:xyz" */ public interface TagThreadSupport { - public boolean storeById(String id, T data, String... tags); + public boolean storeById(String id, T data, String... tags) throws IOException;; - public List findAllIdsByTag(String tag); + public List findAllIdsByTag(String tag) throws IOException;; - public T findNextById(String id, Direction direction, String tag); + public T findNextById(String id, Direction direction, String tag) throws IOException;; public static enum Direction { FORWARD, diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractFileBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractFileBucket.java index f6074882fa95dba0745e2503685de65dce4e0309..7d6bcfa21d6efc956034fc99ea84839e43b3f152 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractFileBucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/AbstractFileBucket.java @@ -174,7 +174,8 @@ public abstract class AbstractFileBucket implements Bucket, TagThreadSuppo } @Override - public void initialize(Class type, File path) throws IOException { + public void initialize(Class type, File baseDir, String logicalPath) throws IOException { + File path = new File(baseDir, logicalPath); m_writeLock = new ReentrantLock(); m_readLock = new ReentrantLock(); m_file = path; diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java index 814ec06882cc882c6bf9ec3bdf4a9273fd14ee39..cad84a3cdc37bcb9b670e314d140ff7a13f1be02 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucket.java @@ -21,8 +21,7 @@ public class DefaultBucket extends AbstractFileBucket { return (T) bytes; } else { - throw new UnsupportedOperationException(String.format( - "Only String or byte[] are supported so far, but was %s.", m_type)); + throw new UnsupportedOperationException(String.format("Only String or byte[] are supported so far, but was %s.", m_type)); } } @@ -40,25 +39,23 @@ public class DefaultBucket extends AbstractFileBucket { buf.writeInt(bytes.length); buf.writeBytes(bytes); } else { - throw new UnsupportedOperationException(String.format( - "Only String or byte[] are supported so far, but was %s.", m_type)); + throw new UnsupportedOperationException(String.format("Only String or byte[] are supported so far, but was %s.", m_type)); } } @Override - public void initialize(Class type, File path) throws IOException { - super.initialize(type, path); + public void initialize(Class type, File baseDir, String logicalPath) throws IOException { + super.initialize(type, baseDir, logicalPath); m_type = type; if (m_type != String.class && m_type != byte[].class) { - throw new UnsupportedOperationException(String.format( - "Only String or byte[] are supported so far, but was %s.", m_type)); + throw new UnsupportedOperationException(String.format("Only String or byte[] are supported so far, but was %s.", m_type)); } } @Override - protected boolean isAutoFlush() { - return true; - } + protected boolean isAutoFlush() { + return true; + } } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java index b85f77aa11f01f49dc5f490c52cd274336a58cb6..cfa5519d232fe0e797dc20bc64165c42f2073cbd 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/internal/DefaultBucketManager.java @@ -22,7 +22,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag protected Bucket createBucket(String path, Class type) throws IOException { Bucket bucket = lookup(Bucket.class, type.getName()); - bucket.initialize(type, new File(m_baseDir, path)); + bucket.initialize(type, new File(m_baseDir), path); return bucket; } @@ -115,7 +115,11 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag @Override public void closeBucket(Bucket bucket) { - bucket.close(); + try { + bucket.close(); + } catch (IOException e) { + e.printStackTrace(); + } release(bucket); } } diff --git a/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java b/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java index ea6a5ff7388b3a8779438c0e1e53e4ab64dbaf17..6fa649aff1a648fd1c2b1cd4b5f211fa1cb79b2e 100644 --- a/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java +++ b/cat-core/src/test/java/com/dianping/cat/storage/BucketTest.java @@ -53,7 +53,7 @@ public class BucketTest extends ComponentTestCase { // close and reload it, check if everything is okay bucket.close(); - bucket.initialize(byte[].class, new File("target/bucket/bytes")); + bucket.initialize(byte[].class, new File("target/bucket/"), "bytes"); // store it and load it for (int i = 0; i < 100; i++) { @@ -99,7 +99,7 @@ public class BucketTest extends ComponentTestCase { // close and reload it, check if everything is okay bucket.close(); - bucket.initialize(MessageTree.class, new File("target/bucket/message")); + bucket.initialize(MessageTree.class, new File("target/bucket/"), "message"); // check next message in the same thread for (int i = 0; i < groups - 1; i++) { @@ -135,7 +135,7 @@ public class BucketTest extends ComponentTestCase { // close and reload it, check if everything is okay bucket.close(); - bucket.initialize(String.class, new File("target/bucket/data")); + bucket.initialize(String.class, new File("target/bucket/"), "data"); // store it and load it for (int i = 0; i < 100; i++) { diff --git a/cat-home/pom.xml b/cat-home/pom.xml index a9cf604163d3201343c8f9c48802cb8931c610aa..956ba3bc05386eef2219eee4e59946e4737cf405 100644 --- a/cat-home/pom.xml +++ b/cat-home/pom.xml @@ -19,6 +19,11 @@ com.dianping.cat cat-job + + org.mortbay.jetty + jsp-api-2.1 + 6.1.14 + com.site.dal dal-xml @@ -40,7 +45,6 @@ org.mortbay.jetty jsp-2.1 - provided com.site.common diff --git a/cat-home/src/main/webapp/WEB-INF/web.xml b/cat-home/src/main/webapp/WEB-INF/web.xml index ce0fdb500b601cf7ae5fba9055dcbcf70f1536e3..535cb442a971e376867b788cd02f479fbcb1275a 100644 --- a/cat-home/src/main/webapp/WEB-INF/web.xml +++ b/cat-home/src/main/webapp/WEB-INF/web.xml @@ -5,12 +5,12 @@ mvc-servlet com.site.web.MVC - 1 + 2 cat-servlet com.dianping.cat.servlet.CatServlet - 2 + 1 mvc-servlet diff --git a/cat-home/src/test/java/com/dianping/cat/SimpleServer.java b/cat-home/src/test/java/com/dianping/cat/SimpleServer.java index b0c800ad7b1b047106339ff3e174ce704654ac8a..58d21057ce730f0a5337cdd9552545c5d185cb40 100644 --- a/cat-home/src/test/java/com/dianping/cat/SimpleServer.java +++ b/cat-home/src/test/java/com/dianping/cat/SimpleServer.java @@ -101,7 +101,7 @@ public class SimpleServer extends SimpleServerSupport { @Test public void startServer() throws Exception { // open the page in the default browser - s_adaptor.display("/cat/r"); + //s_adaptor.display("/cat/r"); System.out.println(String.format("[%s] Press any key to stop server ... ", getTimestamp())); System.in.read(); diff --git a/cat-job/datasources.xml b/cat-job/datasources.xml new file mode 100644 index 0000000000000000000000000000000000000000..fec5fdbce612cdea2aaf256e71a519c9ae14f85f --- /dev/null +++ b/cat-job/datasources.xml @@ -0,0 +1,17 @@ + + + + + 3 + 1s + 10m + 1000 + + com.mysql.jdbc.Driver + + root + 123456 + + + + diff --git a/cat-job/pom.xml b/cat-job/pom.xml index 9da12a87a195d0ad262e204c6ec2c308b150d13a..23daba1737288fdc0e292d25b189f214d2c8b6a6 100644 --- a/cat-job/pom.xml +++ b/cat-job/pom.xml @@ -14,6 +14,14 @@ com.dianping.cat cat-core + + com.site.dal + dal-jdbc + + + mysql + mysql-connector-java + org.apache.hadoop hadoop-core @@ -34,6 +42,11 @@ jackson-mapper-asl 1.9.4 + + com.dianping.tkv + fkv + 0.1.0-SNAPSHOT + junit junit @@ -47,6 +60,13 @@ maven-codegen-plugin 1.0.10 + + generate dal jdbc model + generate-sources + + dal-jdbc + + generate plexus component descriptor process-classes diff --git a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java index c39892ebb370061858036d20c530a93e7b8cdac4..17eec88ffbf7bf3db56ea62e440ccdd98d7599d2 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java +++ b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java @@ -42,6 +42,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(MessageConsumer.class, HdfsDumpConsumer.ID, HdfsDumpConsumer.class) // .req(MessageStorage.class, "hdfs")); + all.addAll(new DatabaseConfigurator().defineComponents()); + return all; } diff --git a/cat-job/src/main/java/com/dianping/cat/job/build/DatabaseConfigurator.java b/cat-job/src/main/java/com/dianping/cat/job/build/DatabaseConfigurator.java new file mode 100644 index 0000000000000000000000000000000000000000..1686bd835abb7497ab0623ac017ddfe1f153fbaa --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/build/DatabaseConfigurator.java @@ -0,0 +1,23 @@ +package com.dianping.cat.job.build; + +import java.util.ArrayList; +import java.util.List; + +import com.dianping.cat.job.sql.dal._INDEX; +import com.site.dal.jdbc.configuration.AbstractJdbcResourceConfigurator; +import com.site.lookup.configuration.Component; + +final class DatabaseConfigurator extends AbstractJdbcResourceConfigurator { + public List defineComponents() { + List all = new ArrayList(); + + all.add(defineJdbcDataSourceConfigurationManagerComponent("datasources.xml")); + all.add(defineJdbcDataSourceComponent("cat", "${jdbc.driver}", "${jdbc.url}", "${jdbc.user}", "${jdbc.password}", + "")); + + defineSimpleTableProviderComponents(all, "cat", _INDEX.getEntityClasses()); + defineDaoComponents(all, _INDEX.getDaoClasses()); + + return all; + } +} \ No newline at end of file diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java index ede7efed82aea4f5e73cdac09ac147a4632db2e8..af6b89a28f24edccc342f51949cf3ccf79d196a4 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java @@ -1,7 +1,6 @@ package com.dianping.cat.job.hdfs; import java.io.IOException; -import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; @@ -20,7 +19,6 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; import com.dianping.cat.message.spi.MessagePathBuilder; -import com.dianping.cat.message.spi.MessageTree; import com.site.lookup.ContainerHolder; import com.site.lookup.annotation.Inject; diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucket.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucket.java new file mode 100644 index 0000000000000000000000000000000000000000..f733245f4ed4564742ce6f798d45dc9c185d5fb9 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucket.java @@ -0,0 +1,104 @@ +/** + * + */ +package com.dianping.cat.job.hdfs; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.dianping.cat.storage.Bucket; +import com.dianping.tkv.Meta; +import com.dianping.tkv.hdfs.HdfsImpl; + +/** + * @author sean.wang + * @since Mar 9, 2012 + */ +public class HdfsBucket implements Bucket { + + private HdfsImpl hdfs; + + private int keyLength = 32; + + private int tagLength = 125; + + public void setHdfs(HdfsImpl hdfs) { + this.hdfs = hdfs; + } + + public void setKeyLength(int keyLength) { + this.keyLength = keyLength; + } + + public void setTagLength(int tagLength) { + this.tagLength = tagLength; + } + + @Override + public boolean storeById(String id, byte[] data, String... tags) { + throw new UnsupportedOperationException(); + } + + @Override + public List findAllIdsByTag(String tag) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] findNextById(String id, com.dianping.cat.storage.TagThreadSupport.Direction direction, String tag) throws IOException { + Meta meta = hdfs.getIndex(id, tag); + if (meta == null) { + return null; + } + int nextPos = meta.getTags().get(tag).getNext(); + return hdfs.get(nextPos); + } + + @Override + public void close() throws IOException { + this.hdfs.close(); + } + + @Override + public void deleteAndCreate() { + throw new UnsupportedOperationException(); + } + + @Override + public List findAllByIds(List ids) throws IOException { + List values = new ArrayList(ids.size()); + for (String id : ids) { + byte[] value = this.findById(id); + values.add(value); + } + return values; + } + + @Override + public byte[] findById(String id) throws IOException { + return hdfs.get(id); + } + + /** + * @param baseDir + * e.g /data/appdata/cat/ + * @param logicalPath + * e.g /a/b/c + */ + @Override + public void initialize(Class type, File baseDir, String logicalPath) throws IOException { + String name = new File(logicalPath).getName(); + String indexFilename = null; + String dataFilename = null; + String hdfsDir = null; + hdfs = new HdfsImpl(hdfsDir, baseDir, indexFilename, dataFilename, keyLength, tagLength); + } + + @Override + public boolean storeById(String id, byte[] data) { + throw new UnsupportedOperationException(); + } + +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucketManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucketManager.java new file mode 100644 index 0000000000000000000000000000000000000000..11b0133d26a2e351009b19a95f2acff0e1e5ec9e --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsBucketManager.java @@ -0,0 +1,14 @@ +/** + * + */ +package com.dianping.cat.job.hdfs; + +import com.dianping.cat.storage.internal.DefaultBucketManager; + +/** + * @author sean.wang + * @since Mar 9, 2012 + */ +public class HdfsBucketManager extends DefaultBucketManager { + +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java index 066f5ce95a8402f28b4619aadbfad394cff58022..215f166decc237f4845bca878cccfd21c81bf0dd 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java @@ -11,11 +11,15 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; +import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessageStorage; import com.dianping.cat.message.spi.MessageTree; import com.site.lookup.annotation.Inject; public class HdfsMessageStorage implements MessageStorage, Initializable, Disposable, LogEnabled { + @Inject + private MessagePathBuilder m_builder; + @Inject private OutputChannelManager m_manager; @@ -54,13 +58,14 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos } @Override - public String getPath(MessageTree tree) { - // TODO fix it - return null; - } + public String getPath(MessageTree tree) { + String path = m_builder.getLogViewPath(tree.getMessageId()); + + return path; + } @Override - public String store(MessageTree tree) { + public String store(MessageTree tree, String... tags) { m_job.append(tree); // Not available @@ -144,9 +149,20 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos } } } + + public MessageTree get(String messageId){ + //TODO + return null; + } + + @Override + public MessageTree next(String messageId, String tag) { + // TODO Auto-generated method stub + return null; + } @Override - public MessageTree get(String messageId) { + public MessageTree previous(String messageId, String tag) { // TODO Auto-generated method stub return null; } diff --git a/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java b/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java index 9321ab2e3ff3173e3a6d03063ab035eaf0df5cb8..2b29ec9a4ded4006b29493521e10bd3af9b5ae99 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java +++ b/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeInputFormat.java @@ -9,9 +9,9 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import com.dianping.cat.job.sql.MutiFileInputFormat; +import com.dianping.cat.job.sql.DirectoryInputFormat; -public class MessageTreeInputFormat extends MutiFileInputFormat { +public class MessageTreeInputFormat extends DirectoryInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/MutiFileInputFormat.java b/cat-job/src/main/java/com/dianping/cat/job/sql/DirectoryInputFormat.java similarity index 69% rename from cat-job/src/main/java/com/dianping/cat/job/sql/MutiFileInputFormat.java rename to cat-job/src/main/java/com/dianping/cat/job/sql/DirectoryInputFormat.java index 9c0207e57e2ab3fa6ee530cc98a6ce6d69747a69..1acb4848f87e03efb92beb92cb475038baf96ca0 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/MutiFileInputFormat.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/DirectoryInputFormat.java @@ -13,11 +13,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.InvalidInputException; import org.apache.hadoop.mapreduce.security.TokenCache; +public abstract class DirectoryInputFormat extends FileInputFormat { -public abstract class MutiFileInputFormat extends FileInputFormat { - public List listStatus(JobContext job) throws IOException { - + List result = new ArrayList(); Path[] dirs = getInputPaths(job); if (dirs.length == 0) { @@ -32,9 +31,10 @@ public abstract class MutiFileInputFormat extends FileInputFormat { if (jobFilter != null) { filters.add(jobFilter); } - //Add Default Hidden file + // Add Default Hidden file PathFilter inputFilter = new MultiPathFilter(filters); + filters.add(hiddenFileFilter); for (int i = 0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); @@ -44,7 +44,7 @@ public abstract class MutiFileInputFormat extends FileInputFormat { } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); } else { - + for (FileStatus globStat : matches) { addFileStat(result, inputFilter, fs, globStat); } @@ -58,32 +58,37 @@ public abstract class MutiFileInputFormat extends FileInputFormat { } public void addFileStat(List result, PathFilter inputFilter, FileSystem fs, FileStatus globStat) - throws IOException { - if (globStat.isDir()) { - for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) { - addFileStat(result,inputFilter,fs,stat); - } - } else { - result.add(globStat); - - System.out.println(globStat.getPath().getName()); - } - } - + throws IOException { + if (globStat.isDir()) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) { + addFileStat(result, inputFilter, fs, stat); + } + } else { + result.add(globStat); + } + } + + private static final PathFilter hiddenFileFilter = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + private static class MultiPathFilter implements PathFilter { - private List filters; + private List filters; - public MultiPathFilter(List filters) { - this.filters = filters; - } + public MultiPathFilter(List filters) { + this.filters = filters; + } - public boolean accept(Path path) { - for (PathFilter filter : filters) { - if (!filter.accept(path)) { - return false; - } - } - return true; - } - } + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } + } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java index 9b6e27ebc3243cd32ed077a6d14f2b8133b145ee..0a244dcce5db9bbf3afbd999e57da959461b3a82 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java @@ -7,6 +7,8 @@ import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -14,19 +16,42 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.dianping.cat.job.mapreduce.MessageTreeInputFormat; +import com.dianping.cat.job.sql.database.SqlRecordJobMapper; +import com.dianping.cat.job.sql.database.SqlRecordJobReducer; import com.site.helper.Files; public class SqlJobMain extends Configured implements Tool { - + + private static final String DEFAUL_IN_PATH = "target/hdfs/"; + + private static final String DEFAUL_OUT_PATH = "target/cat/sql/"; + + private static final String DEFAULT_FINAL_PATH = "target/cat/result/"; + + private static final int DEFAULT_REDUCE_NUMBER = 3; + + /** + * The job process last hour data when no args default. The args[0] can set + * the number of reduce; The args[1] is for input path + */ public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new SqlJobMain(), args); System.exit(exitCode); } + private String getLastHoursString(int hours) { + Date date = new Date(); + long lastHour = date.getTime(); + + lastHour = lastHour - lastHour % (60 * 60 * 1000) - 60 * 60 * 1000 * hours; + date.setTime(lastHour); + return new SimpleDateFormat("yyyyMMdd/HH/").format(date); + } + @Override public int run(String[] args) throws Exception { - Configuration conf = getConf(); + /*Configuration conf = getConf(); Job job = new Job(conf, "Sql Analyzer"); job.setJarByClass(SqlJobMain.class); @@ -36,19 +61,60 @@ public class SqlJobMain extends Configured implements Tool { job.setOutputKeyClass(SqlStatementKey.class); job.setOutputValueClass(SqlJobResult.class); job.setPartitionerClass(SqlJobPatitioner.class); - job.setMapOutputKeyClass(SqlStatementKey.class); job.setMapOutputValueClass(SqlStatementValue.class); - job.setNumReduceTasks(2); - - SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd/HH/"); - String dateStr = sdf.format(new Date()); - String path = "target/hdfs/20120306/23/null/"; - - FileInputFormat.addInputPath(job, new Path(path)); - FileOutputFormat.setOutputPath(job, new Path("target/sql")); - Files.forDir().delete(new File("target/sql"), true); - + job.setNumReduceTasks(DEFAULT_REDUCE_NUMBER); + + if (args.length > 0) { + try { + job.setNumReduceTasks(Integer.parseInt(args[0])); + } catch (Exception e) { + System.out.println("The input args of the job is not correct, the args[0] should be integer!"); + return 0; + } + } + String hourStr = getLastHoursString(1); + + String inputPath = DEFAUL_IN_PATH + hourStr; + String outputPath = DEFAUL_OUT_PATH + hourStr; + + if (args.length > 1) { + if (args.length >= 2) { + inputPath = args[1]; + } + } + + System.out.println(String.format("InputPath: %s,OutPath %s", inputPath, outputPath)); + + FileInputFormat.addInputPath(job, new Path(inputPath)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + Files.forDir().delete(new File(outputPath), true); + + if (job.waitForCompletion(true)) { + return runSqlRecordJob(hourStr); + } else { + return 0; + }*/ + String hourStr = getLastHoursString(1); + return runSqlRecordJob(hourStr); + } + + /* + * insert the result to mysql + */ + private int runSqlRecordJob(String currentHour) throws Exception { + Configuration conf = getConf(); + conf.set("JobHour", currentHour); + Job job = new Job(conf, "Sql Record"); + + job.setJarByClass(SqlJobMain.class); + job.setMapperClass(SqlRecordJobMapper.class); + job.setReducerClass(SqlRecordJobReducer.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + FileInputFormat.addInputPath(job, new Path(DEFAUL_OUT_PATH+currentHour)); + FileOutputFormat.setOutputPath(job, new Path(DEFAULT_FINAL_PATH)); + Files.forDir().delete(new File(DEFAULT_FINAL_PATH), true); return job.waitForCompletion(true) ? 0 : 1; } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java index e85dd32a113bb509f4af78fadc9697a46fb455f1..28b8469198ea199111c26f0454d6961768ad6c13 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMapper.java @@ -40,15 +40,16 @@ public class SqlJobMapper extends Mapper m_durations = new ArrayList(); - private double m_sum2; - - private int m_successCount; - private int m_failureCount; private int m_longTimeCount; - public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("Count: ").append(m_count).append("\t").append("Sum: ").append(m_sum).append("\t")// - .append("Sum2: ").append(m_sum2).append("\t").append("Std: ").append(getStd()); - sb.append("\t").append("Success: ").append(m_successCount); - sb.append("\t").append("Failure: ").append(m_failureCount).append("\t").append("Long: ").append(m_longTimeCount); - return sb.toString(); - } + private double m_max = Double.MIN_VALUE; + + private double m_min = Double.MAX_VALUE; + + private double m_sum; + + private double m_sum2; + + private List m_urls = new ArrayList(); + + private DecimalFormat df = new DecimalFormat("#.##"); - public void add(double value, int flag) { - m_count++; + public void add(double value, int flag, String url ) { m_sum += value; - m_sum2 = m_sum2 + value * value; + m_sum2 += value * value; if (flag == 1) { m_failureCount++; - }else{ - m_successCount++; } - if(value>LONG_TIME){ + if (value > LONG_TIME) { m_longTimeCount++; } + if (value < m_min) { + m_min = value; + } + if (value > m_max) { + m_max = value; + } + m_durations.add(value); + + int size = m_urls.size(); + if (size == 0) { + m_urls.add(url); + } else if (size == 1 && flag == 1) { + m_urls.add(url); + } } public double getAvg() { - if (m_count == 0) { - return 0; + Collections.sort(m_durations); + int size = 95 * m_durations.size() / 100; + double sum = 0; + + for (int i = 0; i < size; i++) { + sum = sum + m_durations.get(i); } - return m_sum / m_count; - } - private double getStd() { - double ave = getAvg(); - return Math.sqrt(m_sum2 / m_count - 2 * ave * ave + ave * ave); + return sum / (double) size; } @Override @@ -63,10 +77,23 @@ public class SqlJobResult implements Writable { "This method should never be called, please check with the author if any problem."); } + public String toString() { + StringBuffer sb = new StringBuffer(); + + sb.append(m_durations.size()).append(SPIT).append(m_failureCount).append(SPIT).append(m_longTimeCount) + .append(SPIT); + sb.append(df.format(m_min)).append(SPIT).append(df.format(m_max)).append(SPIT).append(df.format(m_sum)) + .append(SPIT).append(df.format(m_sum2)).append(SPIT).append(df.format(getAvg())).append(SPIT); + + int size = m_urls.size(); + + sb.append(m_urls.get(size-1)); + return sb.toString(); + } + @Override public void write(DataOutput arg0) throws IOException { throw new UnsupportedOperationException( "This method should never be called, please check with the author if any problem."); } - } \ No newline at end of file diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java index 29bdfb05ee1ac88f039c70bd9831c38ba1b5be5a..4ca245a6a3d819cd1709411155bf98d145c9246c 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java @@ -8,15 +8,27 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class SqlStatementKey implements WritableComparable { - private Text m_statement; + private Text m_name; private Text m_domain; + private Text m_statement; + public SqlStatementKey() { + m_name = new Text(); m_statement = new Text(); m_domain = new Text(); } + public Text getName() { + return m_name; + } + + public SqlStatementKey setName(Text name) { + m_name = name; + return this; + } + public Text getDomain() { return m_domain; } @@ -37,12 +49,14 @@ public class SqlStatementKey implements WritableComparable { @Override public void write(DataOutput out) throws IOException { + m_name.write(out); m_domain.write(out); m_statement.write(out); } @Override public void readFields(DataInput in) throws IOException { + m_name.readFields(in); m_domain.readFields(in); m_statement.readFields(in); } @@ -50,10 +64,10 @@ public class SqlStatementKey implements WritableComparable { @Override public int compareTo(SqlStatementKey key) { if (m_domain.toString().equals(key.getDomain().toString())) { - if (m_statement.toString().equals(key.getStatement().toString())) { + if (m_name.toString().equals(key.getName().toString())) { return 0; } else { - return m_statement.compareTo(key.getStatement()); + return m_name.compareTo(key.getName()); } } return m_domain.compareTo(key.getDomain()); @@ -61,6 +75,18 @@ public class SqlStatementKey implements WritableComparable { @Override public String toString() { - return String.format("[domain:%s statement:%s]", m_domain, m_statement); + String statement = m_statement.toString(); + // to assure the output string not contain \t + statement = statement.replaceAll("\n", " "); + statement = statement.replaceAll("\t", " "); + statement = statement.replaceAll("\"", "\'"); + m_statement = new Text(statement); + + String name = m_name.toString(); + name = name.replaceAll("\n", " "); + name = name.replaceAll("\t", " "); + name = name.replaceAll("\"", "\'"); + m_name = new Text(name); + return String.format("%s\t%s\t%s", m_domain, m_name, m_statement); } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java index bc81432ded0f6b23625e25b3dc8edf0652116a8d..8c43c3d8b3e5b5702c044f26de6e1a0a5d70b9d0 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementValue.java @@ -4,6 +4,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; public class SqlStatementValue implements Writable { @@ -12,8 +13,10 @@ public class SqlStatementValue implements Writable { public double m_value; + public Text m_sampleUrl; + public SqlStatementValue(){ - + m_sampleUrl = new Text(); } public int getFlag() { @@ -23,22 +26,28 @@ public class SqlStatementValue implements Writable { public double getValue() { return m_value; } + + public String getSampleUrl(){ + return m_sampleUrl.toString(); + } - public SqlStatementValue(int flag, double value) { + public SqlStatementValue(int flag, double value ,String url) { m_flag = flag; m_value = value; + m_sampleUrl=new Text(url); } @Override public void readFields(DataInput input) throws IOException { m_flag = input.readInt(); m_value = input.readDouble(); + m_sampleUrl.readFields(input); } @Override public void write(DataOutput output) throws IOException { output.writeInt(m_flag); output.writeDouble(m_value); + m_sampleUrl.write(output); } - } diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/database/ContainerBootstrap.java b/cat-job/src/main/java/com/dianping/cat/job/sql/database/ContainerBootstrap.java new file mode 100644 index 0000000000000000000000000000000000000000..905463748d1edd4f0e9576d75d3c8f3b45a250f0 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/database/ContainerBootstrap.java @@ -0,0 +1,26 @@ +package com.dianping.cat.job.sql.database; + +import org.codehaus.plexus.PlexusContainer; +import org.codehaus.plexus.component.repository.exception.ComponentLookupException; + +import com.site.lookup.ContainerLoader; + +public enum ContainerBootstrap { + INSTANCE; + + private PlexusContainer m_container; + + private ContainerBootstrap() { + m_container = ContainerLoader.getDefaultContainer(); + } + + @SuppressWarnings("unchecked") + public T lookup(Class role) throws ComponentLookupException { + return (T) m_container.lookup(role); + } + + @SuppressWarnings("unchecked") + public T lookup(Class role, String roleHint) throws ComponentLookupException { + return (T) m_container.lookup(role, roleHint); + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobMapper.java b/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobMapper.java new file mode 100644 index 0000000000000000000000000000000000000000..012021d8715f1af3aa5bc3db99e46e3b679f97cd --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobMapper.java @@ -0,0 +1,13 @@ +package com.dianping.cat.job.sql.database; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +public class SqlRecordJobMapper extends Mapper { + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + context.write(value, new Text(context.getConfiguration().get("JobHour", ""))); + } +} + diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobReducer.java b/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobReducer.java new file mode 100644 index 0000000000000000000000000000000000000000..cdb9df87fe9568959dc2107bcf8e4ef11757f394 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlRecordJobReducer.java @@ -0,0 +1,48 @@ +package com.dianping.cat.job.sql.database; + +import java.io.IOException; +import java.util.Date; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.codehaus.plexus.component.repository.exception.ComponentLookupException; + +import com.dianping.cat.job.sql.dal.Sqlreport; +import com.dianping.cat.job.sql.dal.SqlreportDao; +import com.site.dal.jdbc.DalException; + +public class SqlRecordJobReducer extends Reducer{ + public void reduce(Text key, Iterable values, Context context) throws IOException, + InterruptedException { + Text currentHour = values.iterator().next(); + SqlReportRecord sql = new SqlReportRecord(currentHour.toString(),key.toString()); + + try { + SqlreportDao dao = ContainerBootstrap.INSTANCE.lookup(SqlreportDao.class); + Sqlreport row = dao.createLocal(); + row.setDomain(sql.getDomain()); + row.setTotalcount(sql.getTotalCount()); + row.setFailures(sql.getFailureCount()); + row.setLongsqls(sql.getLongCount()); + row.setAvg2value(sql.getAvg2()); + row.setSumvalue(sql.getSum()); + row.setSum2value(sql.getSum2()); + row.setMaxvalue(sql.getMax()); + row.setMinvalue(sql.getMin()); + row.setStatement(sql.getStatement()); + row.setName(sql.getName()); + row.setSamplelink(sql.getSampleLink()); + row.setTransactiondate(sql.getDate()); + row.setCreationdate(new Date()); + dao.insert(row); + } catch (ComponentLookupException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (DalException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + System.out.println(sql); + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlReportRecord.java b/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlReportRecord.java new file mode 100644 index 0000000000000000000000000000000000000000..a5f3b26946b47f99ca6dd0316fce59182eb14f98 --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/database/SqlReportRecord.java @@ -0,0 +1,210 @@ +package com.dianping.cat.job.sql.database; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class SqlReportRecord { + + private String m_domain; + + private String m_name; + + private String m_statement; + + private String m_sampleLink; + + private Date m_transactionDate; + + private int m_totalCount; + + private int m_failureCount; + + private int m_longCount; + + private double m_min; + + private double m_max; + + /** + * the avg2 is not contain the max of top5% + */ + private double m_avg2; + + private double m_sum; + + private double m_sum2; + + private Date m_creationDate; + + private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + private SimpleDateFormat hourFormat = new SimpleDateFormat("yyyyMMdd/HH"); + + private static final String SPIT = "\t"; + + public SqlReportRecord(){ + + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append(sdf.format(m_transactionDate)).append(SPIT); + sb.append(sdf.format(m_creationDate)).append(SPIT); + sb.append(m_domain).append(SPIT); + sb.append(m_name).append(SPIT); + sb.append(m_statement).append(SPIT); + sb.append(m_totalCount).append(SPIT); + sb.append(m_failureCount).append(SPIT); + sb.append(m_longCount).append(SPIT); + sb.append(m_min).append(SPIT); + sb.append(m_max).append(SPIT); + sb.append(m_sum).append(SPIT); + sb.append(m_sum2).append(SPIT); + sb.append(m_avg2).append(SPIT); + sb.append(m_sampleLink).append(SPIT); + return sb.toString(); + } + + // domain1 SQLStatement Internal9 500 500 500 100 199 74750 11591750 147 + public SqlReportRecord(String currentHour,String text) { + + try { + m_transactionDate = hourFormat.parse(currentHour); + } catch (ParseException e) { + Date error = new Date(); + error.setTime(0); + m_transactionDate =error ; + } + + m_creationDate = new Date(); + String[] params = text.split("\t"); + m_domain = params[0]; + m_name = params[1]; + m_statement = params[2]; + m_totalCount = Integer.parseInt(params[3]); + m_failureCount = Integer.parseInt(params[4]); + m_longCount = Integer.parseInt(params[5]); + m_min = Double.parseDouble(params[6]); + m_max = Double.parseDouble(params[7]); + m_sum = Double.parseDouble(params[8]); + m_sum2 = Double.parseDouble(params[9]); + m_avg2 = Double.parseDouble(params[10]); + m_sampleLink = params[11]; + } + + + public String getName() { + return m_name; + } + + public void setName(String name) { + m_name = name; + } + + public String getDomain() { + return m_domain; + } + + public void setDomain(String domain) { + m_domain = domain; + } + + public String getStatement() { + return m_statement; + } + + public void setStatement(String statement) { + m_statement = statement; + } + + public String getSampleLink() { + return m_sampleLink; + } + + public void setSampleLink(String sampleLink) { + m_sampleLink = sampleLink; + } + + public Date getDate() { + return m_transactionDate; + } + + public void setDate(Date date) { + m_transactionDate = date; + } + + public int getTotalCount() { + return m_totalCount; + } + + public void setTotalCount(int totalCount) { + m_totalCount = totalCount; + } + + public int getFailureCount() { + return m_failureCount; + } + + public void setFailureCount(int failureCount) { + m_failureCount = failureCount; + } + + public int getLongCount() { + return m_longCount; + } + + public void setLongCount(int longCount) { + m_longCount = longCount; + } + + public double getMin() { + return m_min; + } + + public void setMin(double min) { + m_min = min; + } + + public double getMax() { + return m_max; + } + + public void setMax(double max) { + m_max = max; + } + + public double getAvg2() { + return m_avg2; + } + + public void setAvg2(double avg2) { + m_avg2 = avg2; + } + + public double getSum() { + return m_sum; + } + + public void setSum(double sum) { + m_sum = sum; + } + + public double getSum2() { + return m_sum2; + } + + public void setSum2(double sum2) { + m_sum2 = sum2; + } + + public Date getCreatTime() { + return m_creationDate; + } + + public void setCreatTime(Date creatTime) { + m_creationDate = creatTime; + } + +} diff --git a/cat-job/src/main/resources/META-INF/dal/jdbc/codegen.xml b/cat-job/src/main/resources/META-INF/dal/jdbc/codegen.xml new file mode 100644 index 0000000000000000000000000000000000000000..b28ad917484b9e36538350d4c81165719825a698 --- /dev/null +++ b/cat-job/src/main/resources/META-INF/dal/jdbc/codegen.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + FROM WHERE = ${key-id}]]> + + + () VALUES()]]> + + + + SET WHERE = ${key-id}]]> + + + + WHERE = ${key-id}]]> + + + + + diff --git a/cat-job/src/main/resources/META-INF/dal/jdbc/dal.xml b/cat-job/src/main/resources/META-INF/dal/jdbc/dal.xml new file mode 100644 index 0000000000000000000000000000000000000000..0b9c45597ae38fdb572fd4b9b4dbe369c86d82ce --- /dev/null +++ b/cat-job/src/main/resources/META-INF/dal/jdbc/dal.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + diff --git a/cat-job/src/main/resources/META-INF/dal/jdbc/manifest.xml b/cat-job/src/main/resources/META-INF/dal/jdbc/manifest.xml new file mode 100644 index 0000000000000000000000000000000000000000..014664533febefeea9cf71fbd8c9c3735624e840 --- /dev/null +++ b/cat-job/src/main/resources/META-INF/dal/jdbc/manifest.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/cat-job/src/main/resources/META-INF/plexus/components.xml b/cat-job/src/main/resources/META-INF/plexus/components.xml index d5128ece51ea2c6b1845f1433e0d3239d9b3b2bf..f2ef483e95726ff75bd4fc432769ecf0a865bdc8 100644 --- a/cat-job/src/main/resources/META-INF/plexus/components.xml +++ b/cat-job/src/main/resources/META-INF/plexus/components.xml @@ -44,5 +44,55 @@ + + com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager + com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager + + datasources.xml + + + + com.site.dal.jdbc.datasource.DataSource + cat + com.site.dal.jdbc.datasource.JdbcDataSource + + cat + 3 + 1s + 10m + 1000 + + ${jdbc.driver} + ${jdbc.url} + ${jdbc.user} + ${jdbc.password} + + + + + + com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager + + + + + com.site.dal.jdbc.mapping.TableProvider + sqlreport + com.site.dal.jdbc.mapping.SimpleTableProvider + + sqlreport + sqlreport + cat + + + + com.dianping.cat.job.sql.dal.SqlreportDao + com.dianping.cat.job.sql.dal.SqlreportDao + + + com.site.dal.jdbc.QueryEngine + + + diff --git a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java index 050433baa734d4a626b156a5bbc7ba0455ffa9f1..a000e4ccef2f938a324a1c31503e7f3b80efbef2 100644 --- a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java +++ b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java @@ -22,56 +22,71 @@ public class SqlJobDataProduceTest extends CatTestCase { MessageStorage storage = lookup(MessageStorage.class, "hdfs"); MessageProducer producer = lookup(MessageProducer.class); InMemoryQueue queue = lookup(InMemoryQueue.class); - - for (int i = 0; i < 10000; i++) { - Transaction t = producer.newTransaction("URL", "MyPage" + (int) (i / 500)); - - try { - // do your business here - t.addData("k1", "v1"); - t.addData("k2", "v2"); - t.addData("k3", "v3"); - - Thread.sleep(1); - - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); - - String sqlStatement = "SQLStatement" + i / 500; - String sqlParaMeter = "SqlParaMeters"; - Transaction sqlTran = producer.newTransaction("SQL", sqlStatement); - - producer.logEvent("SQL.PARAM", sqlStatement, Transaction.SUCCESS, - Stringizers.forJson().compact().from(sqlParaMeter)); - sqlTran.addData(sqlStatement + "detail..."); - - sqlTran.complete(); - - if (i % 2 == 1) { - sqlTran.setStatus(Message.SUCCESS); - } else { - sqlTran.setStatus("Error"); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 10000; j++) { + Transaction t = producer.newTransaction("URL", "MyPage" + (int) (j / 500)); + + try { + // do your business here + t.addData("k1", "v1"); + t.addData("k2", "v2"); + t.addData("k3", "v3"); + + Thread.sleep(1); + + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=..."); + + String sqlName = "SQLStatement" + j / 500; + String sqlParaMeter = "SQLParaMeter" + j / 500; + String sqlStatement = "select * from table where id=\"1\"\n order by id desc"; + Transaction sqlTran = producer.newTransaction("SQL", sqlName); + + producer.logEvent("SQL.PARAM", sqlParaMeter, Transaction.SUCCESS, + Stringizers.forJson().compact().from(sqlParaMeter)); + sqlTran.addData(sqlStatement); + + String sqlInternalName = "SQLStatement Internal" + j / 500; + String sqlParaInternal = "SQLParaMeter Internal" + j / 500; + String sqlInternal = "select * from intenal table where id=\"1\"\n order by id desc"; + Transaction internal = producer.newTransaction("SQL", sqlInternalName); + + producer.logEvent("SQL.PARAM", sqlParaInternal, Transaction.SUCCESS, Stringizers.forJson().compact() + .from(sqlParaInternal)); + internal.addData(sqlInternal); + internal.complete(); + + if (j % 2 == 1) { + internal.setStatus(Message.SUCCESS); + } else { + internal.setStatus("Error"); + } + + sqlTran.complete(); + + DefaultTransaction sqlInternalTran = (DefaultTransaction) internal; + sqlInternalTran.setDuration(j % 100 + 100); + if (j % 2 == 1) { + sqlTran.setStatus(Message.SUCCESS); + } else { + sqlTran.setStatus("Error"); + } + + DefaultTransaction def = (DefaultTransaction) sqlTran; + def.setDuration(j % 100 + 50); + t.setStatus(Message.SUCCESS); + } catch (Exception e) { + t.setStatus(e); + } finally { + t.complete(); } - - DefaultTransaction def = (DefaultTransaction) sqlTran; - def.setDuration(i % 100); - - t.setStatus(Message.SUCCESS); - } catch (Exception e) { - t.setStatus(e); - } finally { - t.complete(); + MessageTree tree = queue.poll(0); + tree.setDomain("domain" + i); + storage.store(tree); } - - MessageTree tree = queue.poll(0); - - - //tree.setDomain("Domain" + i % 3); - storage.store(tree); } - ((HdfsMessageStorage) storage).dispose(); } } diff --git a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java new file mode 100644 index 0000000000000000000000000000000000000000..900afbabfa587cdcbcbc7279695a84cfefff154e --- /dev/null +++ b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java @@ -0,0 +1,41 @@ +package com.dianping.cat.job.sql; + +import java.text.DecimalFormat; + +import junit.framework.Assert; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import com.dianping.cat.job.sql.database.SqlReportRecord; + +@RunWith(JUnit4.class) +public class SqlRecordTest { + + @Test + public void test() { + DecimalFormat df = new DecimalFormat("#.##"); + + Assert.assertEquals("1.23", df.format(1.234567)); + } + + @Test + public void test2() { + String text = "domain1 SQLStatement-Internal9 insert into mysql where is='sfsdf' 500 500 500 100 199 74750 11591750 147 www.sina.com"; + SqlReportRecord record = new SqlReportRecord("20120309/11", text); + + Assert.assertEquals("domain1", record.getDomain()); + Assert.assertEquals("SQLStatement-Internal9", record.getName()); + Assert.assertEquals("insert into mysql where is='sfsdf'", record.getStatement()); + Assert.assertEquals(500, record.getTotalCount()); + Assert.assertEquals(500, record.getFailureCount()); + Assert.assertEquals(500, record.getLongCount()); + Assert.assertEquals(100.0, record.getMin()); + Assert.assertEquals(199.0, record.getMax()); + Assert.assertEquals(74750.0, record.getSum()); + Assert.assertEquals(11591750.0, record.getSum2()); + Assert.assertEquals(147.0, record.getAvg2()); + Assert.assertEquals("www.sina.com", record.getSampleLink()); + } +} diff --git a/pom.xml b/pom.xml index db58c0fcde7152007887b1ee175778e787a5da8e..61d0717085e0a8ac97347cb1c80f2cc254e307be 100644 --- a/pom.xml +++ b/pom.xml @@ -63,7 +63,7 @@ com.site.dal dal-jdbc - 1.0.0 + 1.0.0-a1 com.site.app