diff --git a/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java b/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java index 357e1ba51cd5a28fcf604bb5179a3ad1334d304b..4aad23009de1603cbf822b77e077958ba9dcc741 100644 --- a/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java +++ b/cat-core/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java @@ -68,7 +68,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { .req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers")); all.add(C(MessageQueue.class, DefaultMessageQueue.class) // - .config(E("size").value("100000")) // + .config(E("size").value("1000")) // .is(PER_LOOKUP)); all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) // diff --git a/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java b/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java index e65cb72bdee3e1b8b28f429244e65b62cab1ae92..cba665fef2f7608b443ce9716a05d6846f6a176e 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java +++ b/cat-core/src/main/java/com/dianping/cat/message/io/DefaultMessageQueue.java @@ -22,7 +22,7 @@ public class DefaultMessageQueue implements MessageQueue, Initializable { if (m_size > 0) { m_queue = new LinkedBlockingQueue(m_size); }else{ - m_queue = new LinkedBlockingQueue(); + m_queue = new LinkedBlockingQueue(10000); } } diff --git a/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketHierarchySender.java b/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketHierarchySender.java index e775ad2cf7e39cb0f6a3622818539f82138188c2..a4d61d5f17e04276740867f4c7c27f12ee5cb24a 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketHierarchySender.java +++ b/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketHierarchySender.java @@ -150,8 +150,9 @@ public class TcpSocketHierarchySender extends Thread implements MessageSender, L m_serverAddresses = serverAddresses; m_futures = new ArrayList(Collections. nCopies(len, null)); - ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()); + ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newFixedThreadPool(10), + Executors.newFixedThreadPool(10)); + ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { diff --git a/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java b/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java index f369766569714b820bf35ddc42600cf4a65aecff..d9e87c548063126f4c489e80735b3f420adb2a65 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java +++ b/cat-core/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java @@ -52,6 +52,10 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled private transient boolean m_active; + private static int s_error; + + private static int s_write; + @Override public void enableLogging(Logger logger) { m_logger = logger; @@ -63,8 +67,8 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled throw new RuntimeException("No server address was configured for TcpSocketSender!"); } - ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()); + ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newFixedThreadPool(10), + Executors.newFixedThreadPool(10)); ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @@ -121,11 +125,29 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled while (m_active) { try { - MessageTree tree = m_queue.poll(); + boolean isWriteable = false; + if (m_future != null && m_future.getChannel().isOpen()) { + if (m_future.getChannel().isWritable()) { + isWriteable = true; + } else { + try { + s_write++; + if (s_write % 1000 == 0) { + System.out.println("Can't write!"); + } + Thread.sleep(100); + } catch (Exception e) { + break; + } + } + } + if (isWriteable) { + MessageTree tree = m_queue.poll(); - if (tree != null) { - sendInternal(tree); - tree.setMessage(null); + if (tree != null) { + sendInternal(tree); + tree.setMessage(null); + } } } catch (Throwable t) { m_logger.error("Error when sending message over TCP socket!", t); @@ -144,8 +166,10 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled if (m_statistics != null) { m_statistics.onOverflowed(tree); } - - m_logger.error("Message queue is full in tcp socket sender!"); + s_error++; + if (s_error % 100 == 0) { + m_logger.error("Message queue is full in tcp socket sender! Number: " + s_error); + } } } diff --git a/cat-core/src/main/java/com/dianping/cat/status/StatusUpdateTask.java b/cat-core/src/main/java/com/dianping/cat/status/StatusUpdateTask.java index 1f17211c8d19e572808a5d35c23fd1b520e52983..00ba71c4e1a799a15fe0dcdda4fec5c5ce2c7e6e 100644 --- a/cat-core/src/main/java/com/dianping/cat/status/StatusUpdateTask.java +++ b/cat-core/src/main/java/com/dianping/cat/status/StatusUpdateTask.java @@ -1,5 +1,8 @@ package com.dianping.cat.status; +import java.text.SimpleDateFormat; +import java.util.Date; + import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; @@ -20,7 +23,7 @@ public class StatusUpdateTask implements Runnable, Initializable { private String m_ipAddress; - private long m_interval = 1000; // 1 ms + private long m_interval = 1000*30; // 1 ms @Override public void initialize() throws InitializationException { @@ -39,7 +42,8 @@ public class StatusUpdateTask implements Runnable, Initializable { heartbeat.addData(status.toString()); heartbeat.setStatus(Message.SUCCESS); heartbeat.complete(); - + System.out.println(new SimpleDateFormat("HH:mm:ss.SSS").format(new Date())); + System.out.println(status); long elapsed = MilliSecondTimer.currentTimeMillis() - start; if (elapsed < m_interval) { diff --git a/cat-home/src/main/webapp/css/sql.css b/cat-home/src/main/webapp/css/sql.css index 4b4d70413b87e6f4843eebd0071649d2eac367cc..9140359ec48bba199c9972e7335b44ee7aa5ddbe 100755 --- a/cat-home/src/main/webapp/css/sql.css +++ b/cat-home/src/main/webapp/css/sql.css @@ -6,11 +6,14 @@ .sql-table .odd td { background-color: #eee; font-size: small; - text-align: center; } .sql-table .even td { background-color: white; font-size: small; - text-align: center; } + +.sql-title { + background-color: #F2F2F2; + font-size: medium; +} \ No newline at end of file diff --git a/cat-home/src/main/webapp/jsp/report/problemGroup.jsp b/cat-home/src/main/webapp/jsp/report/problemGroup.jsp index 01ebb8097a649ec9c7336564855b748d9bf926c0..94bf20f8b1c04add840d214e389a6ce259a75ef3 100755 --- a/cat-home/src/main/webapp/jsp/report/problemGroup.jsp +++ b/cat-home/src/main/webapp/jsp/report/problemGroup.jsp @@ -23,7 +23,7 @@ Machines: - ${machine.value.ip} + ${machine.value.ip} diff --git a/cat-home/src/main/webapp/jsp/report/problemThread.jsp b/cat-home/src/main/webapp/jsp/report/problemThread.jsp index f2c6fb5a2a91e1f86cf82d19f41980f97cc9209f..d6dc7cb1e82c680be02ad5c1c0c1fa3b4aaaa9d3 100755 --- a/cat-home/src/main/webapp/jsp/report/problemThread.jsp +++ b/cat-home/src/main/webapp/jsp/report/problemThread.jsp @@ -23,7 +23,7 @@ Machines: - ${machine.value.ip} + ${machine.value.ip} diff --git a/cat-home/src/main/webapp/jsp/report/sql.jsp b/cat-home/src/main/webapp/jsp/report/sql.jsp index a52674d42e403af39d1872dbc31551c7e60832a2..558c9e2a4895e26bac9ac461ab04dc2fa56274e2 100755 --- a/cat-home/src/main/webapp/jsp/report/sql.jsp +++ b/cat-home/src/main/webapp/jsp/report/sql.jsp @@ -37,15 +37,15 @@ [:: show ::] ${reportRecord.record.name} - ${w:format(reportRecord.record.totalcount,'0.00')} - ${w:format(reportRecord.record.failures,'0.00')} + ${w:format(reportRecord.record.totalCount,'0.00')} + ${w:format(reportRecord.record.failureCount,'0.00')} ${w:format(reportRecord.failurePercent,'0.00%')} - ${w:format(reportRecord.record.minvalue,'0.00')} / ${w:format(reportRecord.record.maxvalue,'0.00')} / ${w:format(reportRecord.avg,'0.00')} / ${w:format(reportRecord.std,'0.00')} - ${w:format(reportRecord.record.avg2value,'0.00')} - ${w:format(reportRecord.record.sumvalue,'0.00')} - ${w:format(reportRecord.record.longsqls,'0.00')} + ${w:format(reportRecord.record.minValue,'0.00')} / ${w:format(reportRecord.record.maxValue,'0.00')} / ${w:format(reportRecord.avg,'0.00')} / ${w:format(reportRecord.std,'0.00')} + ${w:format(reportRecord.record.avg2Value,'0.00')} + ${w:format(reportRecord.record.sumValue,'0.00')} + ${w:format(reportRecord.record.longSqls,'0.00')} ${w:format(reportRecord.longPercent,'0.00%')} - Link + Link diff --git a/cat-home/src/main/webapp/jsp/report/sql_graphs.jsp b/cat-home/src/main/webapp/jsp/report/sql_graphs.jsp index 01083ab0bb44a9afbc4004e1057b2c4f2d324332..ec598877b409617c8558adb5db963a85c6e2a684 100755 --- a/cat-home/src/main/webapp/jsp/report/sql_graphs.jsp +++ b/cat-home/src/main/webapp/jsp/report/sql_graphs.jsp @@ -1,7 +1,9 @@ <%@ page contentType="text/html; charset=utf-8"%> +
SQL Statement: ${model.statement} +
${model.graph1} ${model.graph2} diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java index 7acb713d456956a592cf186e690c679fbcf79172..af7ff03811962eeef69cb7a6c9fc066d869a4f06 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannel.java @@ -30,6 +30,7 @@ public class DefaultOutputChannel implements OutputChannel { public void close() { if (m_out != null) { try { + m_out.flush(); m_out.close(); m_out = null; } catch (IOException e) { diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java index deac93db80a033b6a48c61b1fad0972e2e6d413c..a0d0b1a93e03fe24187350c588cc51897d906e21 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/HdfsMessageStorage.java @@ -40,6 +40,8 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos } catch (InterruptedException e) { // ignore it } + + this.m_manager.closeAllChannels(); } @Override diff --git a/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeReader.java b/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeReader.java index b72b494c9c0fee34ea023ac3a5f8744df2e4aed4..2c2310ca67e076ee84f7bef78f25d940c5f2b4b1 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeReader.java +++ b/cat-job/src/main/java/com/dianping/cat/job/mapreduce/MessageTreeReader.java @@ -72,22 +72,25 @@ public class MessageTreeReader extends RecordReader extends FileInputFormat { addFileStat(result, inputFilter, fs, stat); } } else { + System.out.println(globStat.getPath().getName()); result.add(globStat); } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java index a7ad825fd10f16e8148869443a6eeae2f72e78b6..27e321fa7ac28a203836c8aec2a0321588fdc29d 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobMain.java @@ -37,11 +37,9 @@ public class SqlJobMain extends Configured implements Tool { * the number of reduce; The args[1] is for input path */ public static void main(String[] args) throws Exception { - // int exitCode = ToolRunner.run(new Configuration(), new SqlJobMain(), args); - - // System.exit(exitCode); int exitCode = ToolRunner.run(new Configuration(), new SqlJobMain(), args); - + + System.exit(exitCode); } private String getLastHoursString(int hours) { @@ -55,8 +53,6 @@ public class SqlJobMain extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - runSqlRecordJob(); - Configuration conf = getConf(); Job job = new Job(conf, "Sql Analyzer"); @@ -92,7 +88,7 @@ public class SqlJobMain extends Configured implements Tool { if (BASE_URL.charAt(BASE_URL.length() - 1) == '/') { BASE_URL = BASE_URL.substring(0, BASE_URL.length() - 1); } - DEFAULT_IN_PATH = BASE_URL + "/dump/"; + DEFAULT_IN_PATH = BASE_URL; DEFAULT_OUT_PATH = BASE_URL + "/sql/"; DEFAULT_FINAL_PATH = BASE_URL + "/sqlResult/"; } @@ -116,8 +112,6 @@ public class SqlJobMain extends Configured implements Tool { } else { return 0; } - // String hourStr = getLastHoursString(1); - // return runSqlRecordJob(hourStr); } /* @@ -139,23 +133,4 @@ public class SqlJobMain extends Configured implements Tool { Files.forDir().delete(new File(DEFAULT_FINAL_PATH), true); return job.waitForCompletion(true) ? 0 : 1; } - - private int runSqlRecordJob() throws Exception { - System.out.println("Insert database job start!"); - Configuration conf = getConf(); - conf.set("JobHour", "20120328/19"); - Job job = new Job(conf, "Sql Record"); - - job.setJarByClass(SqlJobMain.class); - job.setMapperClass(SqlRecordJobMapper.class); - job.setReducerClass(SqlRecordJobReducer.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - FileInputFormat.addInputPath(job, new Path("target/Test")); - String output = "target/test2"; - FileOutputFormat.setOutputPath(job, new Path(output)); - FileSystem fs = FileSystem.get(conf); - fs.delete(new Path(output), true); - return job.waitForCompletion(true) ? 0 : 1; - } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java index 25762246fc801efe9be17851dcfd9e4c967aacc4..1a0ede37820ebd642ae763751b6307b0fb304821 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlJobResult.java @@ -104,9 +104,12 @@ public class SqlJobResult implements Writable { return (int) Math.pow(2, min); } - public double getAvg() { + //get the + public double getPercent95Line() { Collections.sort(m_durations); int size = 95 * m_durations.size() / 100; + return m_durations.get(size); + /* double sum = 0; for (int i = 0; i < size; i++) { @@ -114,7 +117,8 @@ public class SqlJobResult implements Writable { } return sum / (double) size; - } + */ + } @Override public void readFields(DataInput arg0) throws IOException { @@ -125,10 +129,14 @@ public class SqlJobResult implements Writable { public String toString() { StringBuffer sb = new StringBuffer(); - sb.append(m_durations.size()).append(SPIT).append(m_failureCount).append(SPIT).append(m_longTimeCount) - .append(SPIT); - sb.append(df.format(m_min)).append(SPIT).append(df.format(m_max)).append(SPIT).append(df.format(m_sum)) - .append(SPIT).append(df.format(m_sum2)).append(SPIT).append(df.format(getAvg())).append(SPIT); + sb.append(m_durations.size()).append(SPIT); + sb.append(m_failureCount).append(SPIT); + sb.append(m_longTimeCount).append(SPIT); + sb.append(df.format(m_min)).append(SPIT); + sb.append(df.format(m_max)).append(SPIT); + sb.append(df.format(m_sum)).append(SPIT); + sb.append(df.format(m_sum2)).append(SPIT); + sb.append(df.format(getPercent95Line())).append(SPIT); int size = m_urls.size(); @@ -173,92 +181,92 @@ public class SqlJobResult implements Writable { } public List getDurations() { - return m_durations; - } + return m_durations; + } public void setDurations(List durations) { - m_durations = durations; - } + m_durations = durations; + } public int getFailureCount() { - return m_failureCount; - } + return m_failureCount; + } public void setFailureCount(int failureCount) { - m_failureCount = failureCount; - } + m_failureCount = failureCount; + } public int getLongTimeCount() { - return m_longTimeCount; - } + return m_longTimeCount; + } public void setLongTimeCount(int longTimeCount) { - m_longTimeCount = longTimeCount; - } + m_longTimeCount = longTimeCount; + } public double getMax() { - return m_max; - } + return m_max; + } public void setMax(double max) { - m_max = max; - } + m_max = max; + } public double getMin() { - return m_min; - } + return m_min; + } public void setMin(double min) { - m_min = min; - } + m_min = min; + } public double getSum() { - return m_sum; - } + return m_sum; + } public void setSum(double sum) { - m_sum = sum; - } + m_sum = sum; + } public double getSum2() { - return m_sum2; - } + return m_sum2; + } public void setSum2(double sum2) { - m_sum2 = sum2; - } + m_sum2 = sum2; + } public Map getDurationDistribution() { - return m_durationDistribution; - } + return m_durationDistribution; + } public void setDurationDistribution(Map durationDistribution) { - m_durationDistribution = durationDistribution; - } + m_durationDistribution = durationDistribution; + } public Map getHitsOverTime() { - return m_hitsOverTime; - } + return m_hitsOverTime; + } public void setHitsOverTime(Map hitsOverTime) { - m_hitsOverTime = hitsOverTime; - } + m_hitsOverTime = hitsOverTime; + } public Map getDurationOverTime() { - return m_durationOverTime; - } + return m_durationOverTime; + } public void setDurationOverTime(Map durationOverTime) { - m_durationOverTime = durationOverTime; - } + m_durationOverTime = durationOverTime; + } public Map getFailureOverTime() { - return m_failureOverTime; - } + return m_failureOverTime; + } public void setFailureOverTime(Map failureOverTime) { - m_failureOverTime = failureOverTime; - } + m_failureOverTime = failureOverTime; + } @Override public void write(DataOutput arg0) throws IOException { diff --git a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java index 4ca245a6a3d819cd1709411155bf98d145c9246c..21d83bef62d98c94934f91d6e2cba01a0212255b 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java +++ b/cat-job/src/main/java/com/dianping/cat/job/sql/SqlStatementKey.java @@ -89,4 +89,12 @@ public class SqlStatementKey implements WritableComparable { m_name = new Text(name); return String.format("%s\t%s\t%s", m_domain, m_name, m_statement); } + + public String replaceBlank(String str){ + String name = str.toString(); + name = name.replaceAll("\\s", " "); + name = name.replaceAll("\t", " "); + name = name.replaceAll("\"", "\'"); + return name; + } } diff --git a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java index e33668f6052fe1f44ac7b561d57924a042a60611..fbfcba619aa457739ce8ac1c18de99d539f97935 100644 --- a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java +++ b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobDataProduceTest.java @@ -33,7 +33,7 @@ public class SqlJobDataProduceTest extends CatTestCase { long currentHour = currentTimeMillis - currentTimeMillis % (60 * 60 * 1000); for (int i = 0; i < 3; i++) { - for (int j = 0; j < 1200; j++) { + for (int j = 0; j < 12000; j++) { Transaction t = producer.newTransaction("URL", "MyPage" + (int) (j / 500)); try { diff --git a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobResultTest.java b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobResultTest.java index c9c881d2feb920e798ebe7cba43e00569ab598c0..8a9fdc3c1fcd6d1fd8072515a12f67d520d5a0e1 100644 --- a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobResultTest.java +++ b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlJobResultTest.java @@ -53,7 +53,7 @@ public class SqlJobResultTest { Assert.assertEquals(46200.0, sql.getSum2()); DecimalFormat df = new DecimalFormat("#.##"); - Assert.assertEquals("5.26", df.format(sql.getAvg())); + Assert.assertEquals("10", df.format(sql.getPercent95Line())); Map durationDistribution = sql.getDurationDistribution(); for (int i = 0; i <= 65536;) { diff --git a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java index 55122a321c2955bab7f090b99e5e8887c2b50087..a00ef843f246551934d6038d3a5d1692671df888 100644 --- a/cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java +++ b/cat-job/src/test/java/com/dianping/cat/job/sql/SqlRecordTest.java @@ -20,6 +20,16 @@ public class SqlRecordTest { Assert.assertEquals("1.23", df.format(1.234567)); } + @Test + public void testBlankReplace(){ + SqlStatementKey key = new SqlStatementKey(); + String excepted =""; + String str = "sdf 123 \nfddf 43 fd"; + str = key.replaceBlank(str); + String temp =str.replaceAll("\\s", "*"); + System.out.println(temp); + System.out.println(str); + } @Test public void testSplit() { String text = "domain1 SQLStatement-Internal9 insert into mysql where is='sfsdf' "+"" +