提交 e7c1b38d 编写于 作者: Y You Yong

modify the cat

上级 e972b10e
......@@ -68,7 +68,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers"));
all.add(C(MessageQueue.class, DefaultMessageQueue.class) //
.config(E("size").value("100000")) //
.config(E("size").value("1000")) //
.is(PER_LOOKUP));
all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) //
......
......@@ -22,7 +22,7 @@ public class DefaultMessageQueue implements MessageQueue, Initializable {
if (m_size > 0) {
m_queue = new LinkedBlockingQueue<MessageTree>(m_size);
}else{
m_queue = new LinkedBlockingQueue<MessageTree>();
m_queue = new LinkedBlockingQueue<MessageTree>(10000);
}
}
......
......@@ -150,8 +150,9 @@ public class TcpSocketHierarchySender extends Thread implements MessageSender, L
m_serverAddresses = serverAddresses;
m_futures = new ArrayList<ChannelFuture>(Collections.<ChannelFuture> nCopies(len, null));
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newFixedThreadPool(10),
Executors.newFixedThreadPool(10));
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
......
......@@ -52,6 +52,10 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled
private transient boolean m_active;
private static int s_error;
private static int s_write;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
......@@ -63,8 +67,8 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled
throw new RuntimeException("No server address was configured for TcpSocketSender!");
}
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newFixedThreadPool(10),
Executors.newFixedThreadPool(10));
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
......@@ -121,11 +125,29 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled
while (m_active) {
try {
MessageTree tree = m_queue.poll();
boolean isWriteable = false;
if (m_future != null && m_future.getChannel().isOpen()) {
if (m_future.getChannel().isWritable()) {
isWriteable = true;
} else {
try {
s_write++;
if (s_write % 1000 == 0) {
System.out.println("Can't write!");
}
Thread.sleep(100);
} catch (Exception e) {
break;
}
}
}
if (isWriteable) {
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
......@@ -144,8 +166,10 @@ public class TcpSocketSender extends Thread implements MessageSender, LogEnabled
if (m_statistics != null) {
m_statistics.onOverflowed(tree);
}
m_logger.error("Message queue is full in tcp socket sender!");
s_error++;
if (s_error % 100 == 0) {
m_logger.error("Message queue is full in tcp socket sender! Number: " + s_error);
}
}
}
......
package com.dianping.cat.status;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
......@@ -20,7 +23,7 @@ public class StatusUpdateTask implements Runnable, Initializable {
private String m_ipAddress;
private long m_interval = 1000; // 1 ms
private long m_interval = 1000*30; // 1 ms
@Override
public void initialize() throws InitializationException {
......@@ -39,7 +42,8 @@ public class StatusUpdateTask implements Runnable, Initializable {
heartbeat.addData(status.toString());
heartbeat.setStatus(Message.SUCCESS);
heartbeat.complete();
System.out.println(new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()));
System.out.println(status);
long elapsed = MilliSecondTimer.currentTimeMillis() - start;
if (elapsed < m_interval) {
......
......@@ -6,11 +6,14 @@
.sql-table .odd td {
background-color: #eee;
font-size: small;
text-align: center;
}
.sql-table .even td {
background-color: white;
font-size: small;
text-align: center;
}
.sql-title {
background-color: #F2F2F2;
font-size: medium;
}
\ No newline at end of file
......@@ -23,7 +23,7 @@
<tr>
<th>Machines:</th>
<c:forEach var="machine" items="${report.machines}">
<td><a href="?domain=${model.domain}&ip=${model.ipAddress}">${machine.value.ip}</a></td>
<td><a href="?domain=${model.domain}&ip=${machine.value.ip}">${machine.value.ip}</a></td>
</c:forEach>
</tr>
</table>
......
......@@ -23,7 +23,7 @@
<tr>
<th>Machines:</th>
<c:forEach var="machine" items="${report.machines}">
<td><a href="?domain=${model.domain}&ip=${model.ipAddress}">${machine.value.ip}</a></td>
<td><a href="?domain=${model.domain}&ip=${machine.value.ip}">${machine.value.ip}</a></td>
</c:forEach>
</tr>
</table>
......
......@@ -37,15 +37,15 @@
<c:forEach var="reportRecord" items="${model.report.reportRecords}" varStatus="status">
<tr class="${status.index mod 2 != 0 ? 'odd' : 'even'}">
<td><a href="?op=graphs&id=${reportRecord.record.id}" onclick="return showGraphs(this,${status.index},'${reportRecord.record.id}');">[:: show ::]</a> ${reportRecord.record.name}</td>
<td>${w:format(reportRecord.record.totalcount,'0.00')}</td>
<td>${w:format(reportRecord.record.failures,'0.00')}</td>
<td>${w:format(reportRecord.record.totalCount,'0.00')}</td>
<td>${w:format(reportRecord.record.failureCount,'0.00')}</td>
<td>${w:format(reportRecord.failurePercent,'0.00%')}</td>
<td>${w:format(reportRecord.record.minvalue,'0.00')} / ${w:format(reportRecord.record.maxvalue,'0.00')} / ${w:format(reportRecord.avg,'0.00')} / ${w:format(reportRecord.std,'0.00')}</td>
<td>${w:format(reportRecord.record.avg2value,'0.00')}</td>
<td>${w:format(reportRecord.record.sumvalue,'0.00')}</td>
<td>${w:format(reportRecord.record.longsqls,'0.00')}</td>
<td>${w:format(reportRecord.record.minValue,'0.00')} / ${w:format(reportRecord.record.maxValue,'0.00')} / ${w:format(reportRecord.avg,'0.00')} / ${w:format(reportRecord.std,'0.00')}</td>
<td>${w:format(reportRecord.record.avg2Value,'0.00')}</td>
<td>${w:format(reportRecord.record.sumValue,'0.00')}</td>
<td>${w:format(reportRecord.record.longSqls,'0.00')}</td>
<td>${w:format(reportRecord.longPercent,'0.00%')}</td>
<td><a href='cat/r/m/${reportRecord.record.samplelink}/logview.html'>Link</a></td>
<td><a href='cat/r/m/${reportRecord.record.sampleLink}/logview.html'>Link</a></td>
</tr>
<tr class="graphs"><td colspan="10" align="center"><div id="${status.index}" style="display:none"></div></td></tr>
</c:forEach>
......
<%@ page contentType="text/html; charset=utf-8"%>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<jsp:useBean id="model" type="com.dianping.cat.report.page.sql.Model" scope="request" />
<div class="sql-title">
SQL Statement: ${model.statement}
</div>
<svg version="1.1" width="980" height="380" xmlns="http://www.w3.org/2000/svg">
${model.graph1}
${model.graph2}
......
......@@ -30,6 +30,7 @@ public class DefaultOutputChannel implements OutputChannel {
public void close() {
if (m_out != null) {
try {
m_out.flush();
m_out.close();
m_out = null;
} catch (IOException e) {
......
......@@ -40,6 +40,8 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos
} catch (InterruptedException e) {
// ignore it
}
this.m_manager.closeAllChannels();
}
@Override
......
......@@ -72,22 +72,25 @@ public class MessageTreeReader extends RecordReader<LongWritable, MessageTreeWri
// open the file and seek to the start of the split
Path file = split.getPath();
System.out.println("Starting process: " + file.getName());
CompressionCodec codec = m_compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(config);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
m_in = new BlockReader(codec.createInputStream(fileIn), config);
m_in = new BlockReader(file, codec.createInputStream(fileIn), config);
m_end = Long.MAX_VALUE;
} else {
if (m_start != 0) {
skipFirstLine = true;
--m_start;
m_start--;
fileIn.seek(m_start);
}
m_in = new BlockReader(fileIn, config);
m_in = new BlockReader(file, fileIn, config);
}
if (skipFirstLine) { // skip first line and re-establish "start".
......@@ -113,7 +116,12 @@ public class MessageTreeReader extends RecordReader<LongWritable, MessageTreeWri
if (m_pos < m_end) {
blockSize = m_in.readBlock(m_value);
m_pos += blockSize;
if (!m_value.isCompleted()) {
return false;
}
}
if (blockSize == 0) {
......@@ -130,9 +138,12 @@ public class MessageTreeReader extends RecordReader<LongWritable, MessageTreeWri
private PlainTextMessageCodec m_codec;
public BlockReader(InputStream in, Configuration config) {
private Path m_file;
public BlockReader(Path file, InputStream in, Configuration config) {
int bufferSize = config.getInt("io.file.buffer.size", 8192);
m_file = file;
m_in = new BufferedInputStream(in, bufferSize);
m_codec = new PlainTextMessageCodec();
m_codec.setBufferWriter(new EscapingBufferWriter());
......@@ -179,7 +190,14 @@ public class MessageTreeReader extends RecordReader<LongWritable, MessageTreeWri
prev = b;
}
m_codec.decode(buf, tree.get());
try {
m_codec.decode(buf, tree.get());
tree.complete();
} catch (Throwable e) {
System.out.println("Error when parsing file: " + m_file);
e.printStackTrace(System.out);
System.out.println("The message tree is: " + tree.get());
}
return count;
}
......
......@@ -12,10 +12,20 @@ import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class MessageTreeWritable implements Writable {
private MessageTree m_tree = new DefaultMessageTree();
private boolean m_completed;
public void complete() {
m_completed = true;
}
public MessageTree get() {
return m_tree;
}
public boolean isCompleted() {
return m_completed;
}
@Override
public void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException(
......
......@@ -64,6 +64,7 @@ public abstract class DirectoryInputFormat<K, V> extends FileInputFormat<K, V> {
addFileStat(result, inputFilter, fs, stat);
}
} else {
System.out.println(globStat.getPath().getName());
result.add(globStat);
}
}
......
......@@ -37,11 +37,9 @@ public class SqlJobMain extends Configured implements Tool {
* the number of reduce; The args[1] is for input path
*/
public static void main(String[] args) throws Exception {
// int exitCode = ToolRunner.run(new Configuration(), new SqlJobMain(), args);
// System.exit(exitCode);
int exitCode = ToolRunner.run(new Configuration(), new SqlJobMain(), args);
System.exit(exitCode);
}
private String getLastHoursString(int hours) {
......@@ -55,8 +53,6 @@ public class SqlJobMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
runSqlRecordJob();
Configuration conf = getConf();
Job job = new Job(conf, "Sql Analyzer");
......@@ -92,7 +88,7 @@ public class SqlJobMain extends Configured implements Tool {
if (BASE_URL.charAt(BASE_URL.length() - 1) == '/') {
BASE_URL = BASE_URL.substring(0, BASE_URL.length() - 1);
}
DEFAULT_IN_PATH = BASE_URL + "/dump/";
DEFAULT_IN_PATH = BASE_URL;
DEFAULT_OUT_PATH = BASE_URL + "/sql/";
DEFAULT_FINAL_PATH = BASE_URL + "/sqlResult/";
}
......@@ -116,8 +112,6 @@ public class SqlJobMain extends Configured implements Tool {
} else {
return 0;
}
// String hourStr = getLastHoursString(1);
// return runSqlRecordJob(hourStr);
}
/*
......@@ -139,23 +133,4 @@ public class SqlJobMain extends Configured implements Tool {
Files.forDir().delete(new File(DEFAULT_FINAL_PATH), true);
return job.waitForCompletion(true) ? 0 : 1;
}
private int runSqlRecordJob() throws Exception {
System.out.println("Insert database job start!");
Configuration conf = getConf();
conf.set("JobHour", "20120328/19");
Job job = new Job(conf, "Sql Record");
job.setJarByClass(SqlJobMain.class);
job.setMapperClass(SqlRecordJobMapper.class);
job.setReducerClass(SqlRecordJobReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("target/Test"));
String output = "target/test2";
FileOutputFormat.setOutputPath(job, new Path(output));
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(output), true);
return job.waitForCompletion(true) ? 0 : 1;
}
}
......@@ -104,9 +104,12 @@ public class SqlJobResult implements Writable {
return (int) Math.pow(2, min);
}
public double getAvg() {
//get the
public double getPercent95Line() {
Collections.sort(m_durations);
int size = 95 * m_durations.size() / 100;
return m_durations.get(size);
/*
double sum = 0;
for (int i = 0; i < size; i++) {
......@@ -114,7 +117,8 @@ public class SqlJobResult implements Writable {
}
return sum / (double) size;
}
*/
}
@Override
public void readFields(DataInput arg0) throws IOException {
......@@ -125,10 +129,14 @@ public class SqlJobResult implements Writable {
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(m_durations.size()).append(SPIT).append(m_failureCount).append(SPIT).append(m_longTimeCount)
.append(SPIT);
sb.append(df.format(m_min)).append(SPIT).append(df.format(m_max)).append(SPIT).append(df.format(m_sum))
.append(SPIT).append(df.format(m_sum2)).append(SPIT).append(df.format(getAvg())).append(SPIT);
sb.append(m_durations.size()).append(SPIT);
sb.append(m_failureCount).append(SPIT);
sb.append(m_longTimeCount).append(SPIT);
sb.append(df.format(m_min)).append(SPIT);
sb.append(df.format(m_max)).append(SPIT);
sb.append(df.format(m_sum)).append(SPIT);
sb.append(df.format(m_sum2)).append(SPIT);
sb.append(df.format(getPercent95Line())).append(SPIT);
int size = m_urls.size();
......@@ -173,92 +181,92 @@ public class SqlJobResult implements Writable {
}
public List<Double> getDurations() {
return m_durations;
}
return m_durations;
}
public void setDurations(List<Double> durations) {
m_durations = durations;
}
m_durations = durations;
}
public int getFailureCount() {
return m_failureCount;
}
return m_failureCount;
}
public void setFailureCount(int failureCount) {
m_failureCount = failureCount;
}
m_failureCount = failureCount;
}
public int getLongTimeCount() {
return m_longTimeCount;
}
return m_longTimeCount;
}
public void setLongTimeCount(int longTimeCount) {
m_longTimeCount = longTimeCount;
}
m_longTimeCount = longTimeCount;
}
public double getMax() {
return m_max;
}
return m_max;
}
public void setMax(double max) {
m_max = max;
}
m_max = max;
}
public double getMin() {
return m_min;
}
return m_min;
}
public void setMin(double min) {
m_min = min;
}
m_min = min;
}
public double getSum() {
return m_sum;
}
return m_sum;
}
public void setSum(double sum) {
m_sum = sum;
}
m_sum = sum;
}
public double getSum2() {
return m_sum2;
}
return m_sum2;
}
public void setSum2(double sum2) {
m_sum2 = sum2;
}
m_sum2 = sum2;
}
public Map<Integer, Integer> getDurationDistribution() {
return m_durationDistribution;
}
return m_durationDistribution;
}
public void setDurationDistribution(Map<Integer, Integer> durationDistribution) {
m_durationDistribution = durationDistribution;
}
m_durationDistribution = durationDistribution;
}
public Map<Integer, Integer> getHitsOverTime() {
return m_hitsOverTime;
}
return m_hitsOverTime;
}
public void setHitsOverTime(Map<Integer, Integer> hitsOverTime) {
m_hitsOverTime = hitsOverTime;
}
m_hitsOverTime = hitsOverTime;
}
public Map<Integer, Double> getDurationOverTime() {
return m_durationOverTime;
}
return m_durationOverTime;
}
public void setDurationOverTime(Map<Integer, Double> durationOverTime) {
m_durationOverTime = durationOverTime;
}
m_durationOverTime = durationOverTime;
}
public Map<Integer, Integer> getFailureOverTime() {
return m_failureOverTime;
}
return m_failureOverTime;
}
public void setFailureOverTime(Map<Integer, Integer> failureOverTime) {
m_failureOverTime = failureOverTime;
}
m_failureOverTime = failureOverTime;
}
@Override
public void write(DataOutput arg0) throws IOException {
......
......@@ -89,4 +89,12 @@ public class SqlStatementKey implements WritableComparable<SqlStatementKey> {
m_name = new Text(name);
return String.format("%s\t%s\t%s", m_domain, m_name, m_statement);
}
public String replaceBlank(String str){
String name = str.toString();
name = name.replaceAll("\\s", " ");
name = name.replaceAll("\t", " ");
name = name.replaceAll("\"", "\'");
return name;
}
}
......@@ -33,7 +33,7 @@ public class SqlJobDataProduceTest extends CatTestCase {
long currentHour = currentTimeMillis - currentTimeMillis % (60 * 60 * 1000);
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 1200; j++) {
for (int j = 0; j < 12000; j++) {
Transaction t = producer.newTransaction("URL", "MyPage" + (int) (j / 500));
try {
......
......@@ -53,7 +53,7 @@ public class SqlJobResultTest {
Assert.assertEquals(46200.0, sql.getSum2());
DecimalFormat df = new DecimalFormat("#.##");
Assert.assertEquals("5.26", df.format(sql.getAvg()));
Assert.assertEquals("10", df.format(sql.getPercent95Line()));
Map<Integer, Integer> durationDistribution = sql.getDurationDistribution();
for (int i = 0; i <= 65536;) {
......
......@@ -20,6 +20,16 @@ public class SqlRecordTest {
Assert.assertEquals("1.23", df.format(1.234567));
}
@Test
public void testBlankReplace(){
SqlStatementKey key = new SqlStatementKey();
String excepted ="";
String str = "sdf 123 \nfddf 43 fd";
str = key.replaceBlank(str);
String temp =str.replaceAll("\\s", "*");
System.out.println(temp);
System.out.println(str);
}
@Test
public void testSplit() {
String text = "domain1 SQLStatement-Internal9 insert into mysql where is='sfsdf' "+"" +
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册