提交 94eddb03 编写于 作者: F Frankie Wu

fix bugs

上级 8a8b3552
......@@ -127,24 +127,20 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
public void doCheckpoint() throws IOException {
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("Realtime", "Checkpoint");
Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint");
t.setStatus(Message.SUCCESS);
try {
for (Map.Entry<String, MessageAnalyzer> e : m_currentAnalyzers.entrySet()) {
Transaction t1 = cat.newTransaction("Checkpoint", e.getKey());
try {
e.getValue().doCheckpoint();
t1.setStatus(Message.SUCCESS);
} catch (Exception re) {
cat.logError(re);
t1.setStatus(re);
} finally {
t1.complete();
}
e.getValue().doCheckpoint();
}
} catch (IOException e) {
cat.logError(e);
t.setStatus(e);
} catch (RuntimeException e) {
cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
......
......@@ -15,6 +15,7 @@ import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.consumer.problem.handler.Handler;
import com.dianping.cat.consumer.problem.model.entity.JavaThread;
import com.dianping.cat.consumer.problem.model.entity.Machine;
......@@ -22,6 +23,9 @@ import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.entity.Segment;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
......@@ -73,8 +77,20 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint");
try {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
}
@Override
......
......@@ -15,6 +15,7 @@ import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.consumer.transaction.model.entity.Duration;
import com.dianping.cat.consumer.transaction.model.entity.Range;
import com.dianping.cat.consumer.transaction.model.entity.TransactionName;
......@@ -23,6 +24,7 @@ import com.dianping.cat.consumer.transaction.model.entity.TransactionType;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
......@@ -76,8 +78,20 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction(getClass().getSimpleName(), "checkpoint");
try {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
}
@Override
......
......@@ -32,11 +32,11 @@ public abstract class AbstractMessage implements Message {
if (m_data == null) {
m_data = keyValuePairs;
} else if (m_data instanceof StringBuilder) {
((StringBuilder) m_data).append(keyValuePairs);
((StringBuilder) m_data).append('&').append(keyValuePairs);
} else {
StringBuilder sb = new StringBuilder(m_data.length() + keyValuePairs.length() + 16);
sb.append(m_data);
sb.append(m_data).append('&');
sb.append(keyValuePairs);
m_data = sb;
}
......
......@@ -235,15 +235,17 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
public void end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.peek();
Transaction current = m_stack.pop();
if (transaction.equals(current)) {
if (transaction == current) {
validateTransaction(current);
} else {
throw new RuntimeException("Internal error: Transaction logging mismatched!");
}
while (transaction != current && !m_stack.empty()) {
validateTransaction(current);
m_stack.pop();
current = m_stack.pop();
}
}
if (m_stack.isEmpty()) {
MessageTree tree = m_tree.copy();
......@@ -279,20 +281,20 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
message.setStatus("unset");
}
if (!message.isCompleted() && message instanceof DefaultTransaction) {
DefaultTransaction t = (DefaultTransaction) message;
validateTransaction(t);
if (message instanceof Transaction) {
validateTransaction((Transaction) message);
}
}
// missing transaction end, log a BadInstrument event so that
// developer can fix the code
DefaultEvent notCompleteEvent = new DefaultEvent("CAT", "BadInstrument");
if (!transaction.isCompleted() && transaction instanceof DefaultTransaction) {
// missing transaction end, log a BadInstrument event so that
// developer can fix the code
DefaultEvent notCompleteEvent = new DefaultEvent("CAT", "BadInstrument");
notCompleteEvent.setStatus("TransactionNotCompleted");
notCompleteEvent.setCompleted(true);
transaction.addChild(notCompleteEvent);
t.setCompleted(true);
}
notCompleteEvent.setStatus("TransactionNotCompleted");
notCompleteEvent.setCompleted(true);
transaction.addChild(notCompleteEvent);
((DefaultTransaction) transaction).setCompleted(true);
}
}
}
......
......@@ -19,7 +19,7 @@ public interface Bucket<T> {
public Collection<String> getIdsByPrefix(String prefix);
public void initialize(Class<?> type, String name, Date timestamp) throws IOException;;
public void initialize(Class<?> type, String name, Date timestamp) throws IOException;
/**
* store the data by id into the bucket.
......@@ -29,5 +29,5 @@ public interface Bucket<T> {
* @return true means the data was stored in the bucket, otherwise false.
* @throws IOException
*/
public boolean storeById(String id, T data) throws IOException;;
public boolean storeById(String id, T data) throws IOException;
}
......@@ -18,6 +18,10 @@ import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.storage.Bucket;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
......@@ -260,48 +264,56 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
return false;
}
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction(getClass().getSimpleName(), "store");
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n";
byte[] firstLine;
byte[] num;
int length;
t.setStatus(Message.SUCCESS);
try {
encode(data, buf);
try {
encode(data, buf);
length = buf.readInt();
firstLine = attributes.getBytes("utf-8");
num = String.valueOf(length).getBytes("utf-8");
} catch (IOException e) {
m_logger.error(String.format("Error when preparing to write to file(%s)!", m_file), e);
t.setStatus(e);
return false;
}
length = buf.readInt();
firstLine = attributes.getBytes("utf-8");
num = String.valueOf(length).getBytes("utf-8");
} catch (Exception e) {
m_logger.error(String.format("Error when preparing to write to file(%s)!", m_file), e);
m_writeLock.lock();
return false;
}
try {
long offset = m_writeFile.getFilePointer();
m_writeLock.lock();
m_writeFile.write(firstLine);
m_writeFile.write(num);
m_writeFile.write('\n');
m_writeFile.write(buf.array(), buf.readerIndex(), length);
m_writeFile.write('\n');
try {
long offset = m_writeFile.getFilePointer();
if (isAutoFlush()) {
m_writeFile.getChannel().force(true);
}
m_writeFile.write(firstLine);
m_writeFile.write(num);
m_writeFile.write('\n');
m_writeFile.write(buf.array(), buf.readerIndex(), length);
m_writeFile.write('\n');
updateIndex(id, tags, offset);
if (isAutoFlush()) {
m_writeFile.getChannel().force(true);
return true;
} catch (Exception e) {
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
t.setStatus(e);
return false;
} finally {
m_writeLock.unlock();
}
updateIndex(id, tags, offset);
return true;
} catch (Exception e) {
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
return false;
} finally {
m_writeLock.unlock();
t.complete();
}
}
......
package com.dianping.cat.report.page.model.problem;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.Cat;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.transform.DefaultMerger;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultEvent;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
......@@ -36,27 +43,69 @@ public class CompositeProblemService implements ModelService<ProblemReport>, Ini
public ModelResponse<ProblemReport> invoke(final ModelRequest request) {
int size = m_services.size();
final List<ModelResponse<ProblemReport>> responses = new ArrayList<ModelResponse<ProblemReport>>(size);
final CountDownLatch latch = new CountDownLatch(size);
final Semaphore semaphore = new Semaphore(0);
final Transaction t = Cat.getProducer().newTransaction("ModelService", "Problem");
int count = 0;
t.setStatus(Message.SUCCESS);
t.addData("request", request);
for (final ModelService<ProblemReport> service : m_services) {
if (!service.isEligable(request)) {
continue;
}
m_threadPool.submit(new Runnable() {
@Override
public void run() {
try {
responses.add(service.invoke(request));
t.addData(service.toString());
} catch (Exception e) {
e.printStackTrace();
logError(t, e);
t.setStatus(e);
} finally {
latch.countDown();
semaphore.release();
}
}
void logError(Transaction t, Throwable cause) {
StringWriter writer = new StringWriter(2048);
cause.printStackTrace(new PrintWriter(writer));
if (cause instanceof Error) {
logEvent(t, "Error", cause.getClass().getName(), "ERROR", writer.toString());
} else if (cause instanceof RuntimeException) {
logEvent(t, "RuntimeException", cause.getClass().getName(), "ERROR", writer.toString());
} else {
logEvent(t, "Exception", cause.getClass().getName(), "ERROR", writer.toString());
}
}
void logEvent(Transaction t, String type, String name, String status, String nameValuePairs) {
Event event = new DefaultEvent(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
event.addData(nameValuePairs);
}
event.setStatus(status);
event.complete();
t.addChild(event);
}
});
count++;
}
try {
latch.await(5000, TimeUnit.MILLISECONDS);
semaphore.tryAcquire(count, 5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore it
t.setStatus(e);
} finally {
t.complete();
}
ModelResponse<ProblemReport> aggregated = new ModelResponse<ProblemReport>();
......@@ -94,7 +143,6 @@ public class CompositeProblemService implements ModelService<ProblemReport>, Ini
public void setSerivces(ModelService<ProblemReport>... services) {
m_services = Arrays.asList(services);
}
/**
* Inject remote servers to load transaction model.
......
package com.dianping.cat.report.page.model.spi.internal;
import java.util.List;
import com.dianping.cat.consumer.RealtimeConsumer;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.lookup.annotation.Inject;
public class BaseLocalModelService implements ModelService<TransactionReport> {
@Inject(type = MessageConsumer.class, value = "realtime")
private RealtimeConsumer m_consumer;
@Override
public ModelResponse<TransactionReport> invoke(ModelRequest request) {
TransactionAnalyzer analyzer = getAnalyzer(request.getPeriod());
ModelResponse<TransactionReport> response = new ModelResponse<TransactionReport>();
if (analyzer != null) {
List<String> domains = analyzer.getDomains();
String d = request.getDomain();
String domain = d != null && d.length() > 0 ? d : domains.isEmpty() ? null : domains.get(0);
TransactionReport report = analyzer.getReport(domain);
if (report != null) {
report.getDomains().addAll(domains);
}
response.setModel(report);
}
return response;
}
private TransactionAnalyzer getAnalyzer(ModelPeriod period) {
if (period.isCurrent() || period.isFuture()) {
return (TransactionAnalyzer) m_consumer.getCurrentAnalyzer("transaction");
} else if (period.isLast()) {
return (TransactionAnalyzer) m_consumer.getLastAnalyzer("transaction");
} else {
throw new RuntimeException("Internal error: this method should not be called!");
}
}
@Override
public boolean isEligable(ModelRequest request) {
ModelPeriod period = request.getPeriod();
return !period.isHistorical();
}
}
package com.dianping.cat.report.page.model.spi.internal;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
import org.xml.sax.SAXException;
import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.helper.Files;
import com.site.helper.Joiners;
import com.site.helper.Joiners.IBuilder;
import com.site.lookup.annotation.Inject;
public abstract class BaseRemoteModelService<T> implements ModelService<T> {
@Inject
private String m_host;
@Inject
private int m_port = 2281; // default admin port
@Inject
private String m_serviceUri = "/cat/r/model";
private String m_name;
public BaseRemoteModelService(String name) {
m_name = name;
}
protected URL buildUrl(ModelRequest request) throws MalformedURLException {
String pairs = Joiners.by('&').prefixDelimiter()
.join(request.getProperties().entrySet(), new IBuilder<Map.Entry<String, String>>() {
@Override
public String asString(Entry<String, String> e) {
return e.getKey() + "=" + e.getValue();
}
});
String url = String.format("http://%s:%s%s/%s/%s/%s?op=xml%s", m_host, m_port, m_serviceUri, m_name,
request.getDomain(), request.getPeriod(), pairs);
return new URL(url);
}
@Override
public ModelResponse<T> invoke(ModelRequest request) {
ModelResponse<T> response = new ModelResponse<T>();
try {
URL url = buildUrl(request);
String xml = Files.forIO().readFrom(url.openStream(), "utf-8");
if (xml != null && xml.trim().length() > 0) {
T report = parse(xml);
response.setModel(report);
}
} catch (Exception e) {
response.setException(e);
}
return response;
}
@Override
public boolean isEligable(ModelRequest request) {
ModelPeriod period = request.getPeriod();
return !period.isHistorical();
}
protected abstract T parse(String xml) throws SAXException, IOException;
public void setHost(String host) {
m_host = host;
}
public void setPort(int port) {
m_port = port;
}
public void setServiceUri(String serviceUri) {
m_serviceUri = serviceUri;
}
}
......@@ -9,6 +9,7 @@ import java.util.Map;
import javax.servlet.ServletException;
import com.dianping.cat.Cat;
import com.dianping.cat.consumer.problem.model.entity.Entry;
import com.dianping.cat.consumer.problem.model.entity.JavaThread;
import com.dianping.cat.consumer.problem.model.entity.Machine;
......@@ -79,7 +80,7 @@ public class Handler implements PageHandler<Context> {
return report;
} else {
throw new RuntimeException("Internal error: no eligable service registered for " + request + "!");
throw new RuntimeException("Internal error: no eligible service registered for " + request + "!");
}
}
......@@ -105,9 +106,15 @@ public class Handler implements PageHandler<Context> {
break;
case DETAIL:
showDetail(model, payload);
break;
}
m_jspViewer.view(ctx, model);
try {
m_jspViewer.view(ctx, model);
} catch (Throwable e) {
Cat.getProducer().logError(e);
e.printStackTrace();
}
}
private void showDetail(Model model, Payload payload) {
......@@ -115,28 +122,30 @@ public class Handler implements PageHandler<Context> {
Machine machine = report.getMachines().get(payload.getIpAddress());
JavaThread thread = machine.getThreads().get(payload.getThreadId());
Segment segment = thread.getSegments().get(payload.getMinute());
if (segment == null) {
model.setEntries(new ArrayList<Entry>());
model.setStatistics(new ArrayList<ProblemStatistics>());
return;
}
List<Entry> entries = segment.getEntries();
Map<String, ProblemStatistics> typeCounts = new HashMap<String, ProblemStatistics>();
for (Entry entry : entries) {
String type = entry.getType();
ProblemStatistics staticstics = typeCounts.get(type);
if (staticstics != null) {
staticstics.setCount(staticstics.getCount() + 1);
} else {
ProblemStatistics temp = new ProblemStatistics();
temp.setCount(1).setType(type);
typeCounts.put(type, temp);
}
}
model.setEntries(entries);
model.setStatistics(new ArrayList<ProblemStatistics>(typeCounts.values()));
}
......
......@@ -3,7 +3,7 @@ package com.dianping.cat.report.page.problem;
public enum JspFile {
VIEW("/jsp/report/problem.jsp"),
DETAIL("jsp/report/problemDetail.jsp")
DETAIL("/jsp/report/problemDetail.jsp")
;
private String m_path;
......
......@@ -25,7 +25,7 @@
</tr>
</c:forEach>
</table>
${model.xxx}
</jsp:body>
</a:report>
......@@ -25,6 +25,8 @@ public class RemoteStringBucket implements Bucket<String>, LogEnabled {
private Logger m_logger;
private String m_name;
@Override
public void close() throws IOException {
}
......@@ -94,32 +96,24 @@ public class RemoteStringBucket implements Bucket<String>, LogEnabled {
@Override
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
m_period = timestamp;
m_name = name;
}
@Override
public boolean storeById(String id, String data) throws IOException {
public boolean storeById(String domain, String data) throws IOException {
Report report = m_reportDao.createLocal();
int pos = id.indexOf('-');
if (pos > 0) {
String name = id.substring(0, pos);
String domain = id.substring(pos + 1);
report.setName(name);
report.setDomain(domain);
report.setType(1);
report.setContent(data);
report.setPeriod(m_period);
report.setName(m_name);
report.setDomain(domain);
report.setType(1);
report.setContent(data);
report.setPeriod(m_period);
try {
m_reportDao.insert(report);
try {
m_reportDao.insert(report);
return true;
} catch (DalException e) {
throw new IOException(String.format("Unable to insert report(%s)!", id), e);
}
return true;
} catch (DalException e) {
throw new IOException(String.format("Unable to insert report(%s)!", domain), e);
}
return false;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册