提交 d4eef005 编写于 作者: F Frankie Wu

small enhancement on transaction report sorting

- remove local-mode from ConsumerConfig
- remove two remote buckets since it's not used any more
- add '/' mapping web.xml
上级 1f4d636b
......@@ -142,7 +142,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_periodManager.setName("RealtimeConsumer-PeriodManager");
m_periodManager.start();
if (m_uploader != null) {
if (m_uploader != null && !m_uploader.isLocalMode()) {
Thread uploadThread = new Thread(m_uploader);
uploadThread.setName("LogviewUploader");
......
......@@ -53,6 +53,8 @@ public class LogviewUploader implements Runnable, Initializable, LogEnabled {
private Logger m_logger;
private boolean m_localMode = true;
public void addBucket(long timestamp, String domain) {
m_todoList.offer(timestamp + ":" + domain);
}
......@@ -68,11 +70,17 @@ public class LogviewUploader implements Runnable, Initializable, LogEnabled {
if (serverConfig != null) {
m_baseDir = serverConfig.getStorage().getLocalBaseDir();
m_localMode = serverConfig.getLocalMode();
}
m_todoList = new TodoList(new File(m_baseDir, "TODO"), m_logger);
}
// TODO temporary way
public boolean isLocalMode() {
return m_localMode;
}
@Override
public void run() {
try {
......@@ -91,6 +99,8 @@ public class LogviewUploader implements Runnable, Initializable, LogEnabled {
throw e;
}
Thread.sleep(100);
}
} catch (Exception e) {
m_logger.error("Error when uploading bucket.", e);
......
......@@ -35,6 +35,10 @@ public class ServerConfigManager implements LogEnabled {
config.accept(new ServerConfigValidator());
m_config = config;
}
if (m_config.isLocalMode()) {
m_logger.warn("CAT server is running in LOCAL mode! No HDFS and MySQL will be accessed!");
}
}
public ServerConfig getServerConfig() {
......
......@@ -23,7 +23,6 @@
<attribute name="value" value-type="String" />
</entity>
<entity name="consumer">
<attribute name="local-mode" value-type="boolean" />
<entity-ref name="long-url" />
</entity>
<entity name="long-url">
......
......@@ -4,8 +4,6 @@ import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.hadoop.dal.LogviewDao;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.hdfs.DefaultInputChannel;
import com.dianping.cat.hadoop.hdfs.DefaultInputChannelManager;
import com.dianping.cat.hadoop.hdfs.DefaultOutputChannel;
......@@ -16,13 +14,9 @@ import com.dianping.cat.hadoop.hdfs.InputChannel;
import com.dianping.cat.hadoop.hdfs.InputChannelManager;
import com.dianping.cat.hadoop.hdfs.OutputChannel;
import com.dianping.cat.hadoop.hdfs.OutputChannelManager;
import com.dianping.cat.hadoop.storage.RemoteMessageBucket;
import com.dianping.cat.hadoop.storage.RemoteStringBucket;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -47,14 +41,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(OutputChannelManager.class) //
.req(MessagePathBuilder.class));
all.add(C(Bucket.class, String.class.getName() + "-remote", RemoteStringBucket.class) //
.is(PER_LOOKUP) //
.req(ReportDao.class));
all.add(C(Bucket.class, MessageTree.class.getName() + "-remote", RemoteMessageBucket.class) //
.is(PER_LOOKUP) //
.req(OutputChannelManager.class, InputChannelManager.class) //
.req(LogviewDao.class, MessagePathBuilder.class));
all.addAll(new DatabaseConfigurator().defineComponents());
return all;
......
package com.dianping.cat.hadoop.storage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.hadoop.dal.Logview;
import com.dianping.cat.hadoop.dal.LogviewDao;
import com.dianping.cat.hadoop.dal.LogviewEntity;
import com.dianping.cat.hadoop.hdfs.InputChannel;
import com.dianping.cat.hadoop.hdfs.InputChannelManager;
import com.dianping.cat.hadoop.hdfs.OutputChannel;
import com.dianping.cat.hadoop.hdfs.OutputChannelManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.dal.jdbc.DalException;
import com.site.lookup.annotation.Inject;
public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
@Inject
private OutputChannelManager m_outputChannelManager;
@Inject
private InputChannelManager m_inputChannelManager;
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private LogviewDao m_logviewDao;
private OutputChannel m_outputChannel;
private String m_path;
private MessageUploadWorker m_worker;
private Map<String, String> m_lruCache = new LinkedHashMap<String, String>(100, 0.75f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Entry<String, String> eldest) {
return size() > 100;
}
};
private Logger m_logger;
@Override
public void close() throws IOException {
m_outputChannelManager.closeChannel(m_outputChannel);
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public MessageTree findById(String id) throws IOException {
try {
Logview logview = m_logviewDao.findByMessageId(id, LogviewEntity.READSET_FULL);
MessageTree tree = readMessageTree(logview);
return tree;
} catch (DalException e) {
m_logger.error(String.format("Unable to find message(%s)!", id), e);
return null;
}
}
protected MessageTree findByIdAndTag(String id, String tagName, boolean direction) throws IOException {
String tagThread = null;
String tagSession = null;
String tagRequest = null;
if (tagName.startsWith("r:")) {
tagRequest = tagName;
} else if (tagName.startsWith("s:")) {
tagSession = tagName;
} else if (tagName.startsWith("t:")) {
tagThread = tagName;
}
try {
Logview logview = m_logviewDao.findNextByMessageIdTags(id, direction, tagThread, tagSession, tagRequest,
LogviewEntity.READSET_FULL);
MessageTree tree = readMessageTree(logview);
return tree;
} catch (DalException e) {
String message = String.format("Unable to find next message(%s) with tag(%s) and direction(%s)!", id, tagName,
direction);
m_logger.error(message, e);
return null;
}
}
@Override
public MessageTree findNextById(String id, String tagName) throws IOException {
return findByIdAndTag(id, tagName, true);
}
@Override
public MessageTree findPreviousById(String id, String tagName) throws IOException {
return findByIdAndTag(id, tagName, false);
}
@Override
public void flush() throws IOException {
}
@Override
public Collection<String> getIds() {
throw new UnsupportedOperationException();
}
@Override
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
String ipAddress = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
String logicalPath = m_pathBuilder.getMessagePath(name, timestamp);
m_path = logicalPath + "-" + ipAddress + "-" + System.currentTimeMillis();
m_outputChannel = m_outputChannelManager.openChannel("logview", m_path, false);
m_worker = new MessageUploadWorker();
m_worker.setName("MessageUploadWorker");
m_worker.start();
}
protected MessageTree readMessageTree(Logview logview) throws IOException {
InputChannel inputChannel = null;
try {
String path = logview.getDataPath();
long offset = logview.getDataOffset();
int length = logview.getDataLength();
inputChannel = m_inputChannelManager.openChannel("logview", path);
MessageTree tree = inputChannel.read(offset, length);
return tree;
} finally {
if (inputChannel != null) {
m_inputChannelManager.closeChannel(inputChannel);
}
}
}
@Override
public boolean storeById(String id, MessageTree tree) throws IOException {
String messageId = tree.getMessageId();
if (m_lruCache.containsKey(messageId)) {
return false;
}
m_lruCache.put(messageId, messageId);
return m_worker.add(tree);
}
class MessageUploadWorker extends Thread {
private BlockingQueue<MessageTree> m_queue = new LinkedBlockingQueue<MessageTree>(10000);
private boolean m_active = true;
public boolean add(MessageTree tree) {
return m_queue.offer(tree);
}
@Override
public void run() {
List<MessageTree> trees = new ArrayList<MessageTree>();
try {
while (m_active) {
for (int i = 0; i < 10; i++) {
MessageTree tree = m_queue.poll();
if (tree != null) {
trees.add(tree);
} else {
break;
}
}
if (trees.size() > 0) {
uploadMessage(trees);
trees.clear();
} else {
Thread.sleep(10);
}
}
} catch (InterruptedException e) {
// ignore it
}
}
public void shutdown() {
m_active = true;
}
void uploadMessage(List<MessageTree> trees) {
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("Bucket", getClass().getSimpleName());
List<Logview> logviews = new ArrayList<Logview>();
try {
for (MessageTree tree : trees) {
String messageId = tree.getMessageId();
int offset = m_outputChannel.getSize();
int length = m_outputChannel.write(tree);
Logview logview = m_logviewDao.createLocal();
logview.setMessageId(messageId);
logview.setDataPath(m_path);
logview.setDataOffset(offset);
logview.setDataLength(length);
logview.setTagThread("t:" + tree.getThreadId());
logview.setTagSession("s:" + tree.getSessionToken());
logview.setTagRequest("r:" + messageId);
logviews.add(logview);
}
m_logviewDao.insert(logviews.toArray(new Logview[0]));
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
e.printStackTrace();
} finally {
t.complete();
}
}
}
}
package com.dianping.cat.hadoop.storage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.dal.ReportEntity;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.storage.Bucket;
import com.site.dal.jdbc.DalException;
import com.site.lookup.annotation.Inject;
public class RemoteStringBucket implements Bucket<String>, LogEnabled {
@Inject
private ReportDao m_reportDao;
private Date m_period;
private Logger m_logger;
private String m_name;
@Override
public void close() throws IOException {
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public String findById(String domain) throws IOException {
throw new UnsupportedOperationException();
}
public List<String> findAllById(String domain) throws IOException {
try {
List<Report> reports = m_reportDao.findAllByPeriodDomainTypeName(m_period, domain, 1, m_name, ReportEntity.READSET_FULL);
List<String> contents = new ArrayList<String>(reports.size());
for (Report r : reports) {
contents.add(r.getContent());
}
return contents;
} catch (DalException e) {
throw new IOException(String.format("Unable to insert report(name=%s, domain=%s)!", m_name, domain), e);
}
}
@Override
public String findNextById(String id, String tag) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public String findPreviousById(String id, String tag) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void flush() throws IOException {
}
@Override
public Collection<String> getIds() {
String name = "";
try {
List<Report> reports = m_reportDao.findAllByPeriodTypeName(m_period, 1, name, ReportEntity.READSET_FULL);
List<String> ids = new ArrayList<String>(reports.size());
for (Report report : reports) {
ids.add(name + "-" + report.getDomain());
}
return ids;
} catch (DalException e) {
m_logger.error(String.format("Unable to get ids by prefix(%s)!", name), e);
}
return Collections.emptyList();
}
@Override
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
m_period = timestamp;
m_name = name;
}
@Override
public boolean storeById(String domain, String data) throws IOException {
Transaction t = Cat.getProducer().newTransaction("Bucket", getClass().getSimpleName());
Report report = m_reportDao.createLocal();
report.setName(m_name);
report.setDomain(domain);
report.setType(1);
report.setContent(data);
report.setPeriod(m_period);
report.setIp(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
t.setStatus(Message.SUCCESS);
try {
m_reportDao.insert(report);
return true;
} catch (DalException e) {
t.setStatus(e);
throw new IOException(String.format("Unable to insert report(%s)!", domain), e);
} finally {
t.complete();
}
}
}
......@@ -62,48 +62,6 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dump-to-hdfs</role-hint>
<implementation>com.dianping.cat.hadoop.DumpToHdfsConsumer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String-remote</role-hint>
<implementation>com.dianping.cat.hadoop.storage.RemoteStringBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.hadoop.dal.ReportDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree-remote</role-hint>
<implementation>com.dianping.cat.hadoop.storage.RemoteMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.OutputChannelManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.InputChannelManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.dal.LogviewDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
<role>com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager</role>
<implementation>com.site.dal.jdbc.datasource.JdbcDataSourceConfigurationManager</implementation>
......
......@@ -78,6 +78,9 @@ public class DisplayTransactionNameReport {
if (m_sorted.equals("failurePercent")) {
return (int) (m2.getDetail().getFailPercent() * 100 - m1.getDetail().getFailPercent() * 100);
}
if (m_sorted.equals("avg")) {
return (int) (m2.getDetail().getAvg() * 100 - m1.getDetail().getAvg() * 100);
}
return 0;
}
}
......
......@@ -73,6 +73,9 @@ public class DisplayTransactionReport {
if (m_sorted.equals("failurePercent")) {
return (int) (m2.getDetail().getFailPercent() * 100 - m1.getDetail().getFailPercent() * 100);
}
if (m_sorted.equals("avg")) {
return (int) (m2.getDetail().getAvg() * 100 - m1.getDetail().getAvg() * 100);
}
return 0;
}
}
......
......@@ -37,9 +37,9 @@ public class CatServlet extends AbstractContainerServlet {
manager.initializeServer(config);
ServerConfigManager configManager = lookup(ServerConfigManager.class);
ServerConfigManager serverConfigManager = lookup(ServerConfigManager.class);
configManager.initialize(catServerXml == null ? null : new File(catServerXml));
serverConfigManager.initialize(catServerXml == null ? null : new File(catServerXml));
final DefaultMessageHandler handler = (DefaultMessageHandler) lookup(MessageHandler.class);
......
......@@ -24,6 +24,10 @@
<servlet-name>mvc-servlet</servlet-name>
<url-pattern>/r/*</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>mvc-servlet</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>cat-servlet</servlet-name>
<url-pattern>/s/*</url-pattern>
......
......@@ -21,11 +21,11 @@
<table class="transaction">
<c:choose>
<c:when test="${empty payload.type}">
<tr><th><a href="?domain=${model.domain}&date=${model.date}&sort=type"> Type</a></th>
<tr><th><a href="?domain=${model.domain}&date=${model.date}&sort=type">Type</a></th>
<th><a href="?domain=${model.domain}&date=${model.date}&sort=total">Total Count</a></th>
<th><a href="?domain=${model.domain}&date=${model.date}&sort=failure">Failure Count</a></th>
<th><a href="?domain=${model.domain}&date=${model.date}&sort=failurePercent">Failure%</a></th>
<th>Sample Link</th><th>Min/Max/Avg/Std(ms)</th></tr>
<th>Sample Link</th><th>Min(ms)</th><th>Max(ms)</th><th><a href="?domain=${model.domain}&date=${model.date}&sort=avg">Avg</a>(ms)</th><th>Std(ms)</th></tr>
<c:forEach var="item" items="${model.displayTypeReport.results}" varStatus="status">
<c:set var="e" value="${item.detail}"/>
<c:set var="lastIndex" value="${status.index}"/>
......@@ -35,18 +35,21 @@
<td>${e.failCount}</td>
<td>${w:format(e.failPercent,'0.00')}</td>
<td><a href="${model.logViewBaseUri}/${empty e.failMessageUrl ? e.successMessageUrl : e.failMessageUrl}">Log View</a></td>
<td>${w:format(e.min,'0.#')}/${w:format(e.max,'0.#')}/${w:format(e.avg,'0.0')}/${w:format(e.std,'0.0')}</td>
<td>${w:format(e.min,'0.#')}</td>
<td>${w:format(e.max,'0.#')}</td>
<td>${w:format(e.avg,'0.0')}</td>
<td>${w:format(e.std,'0.0')}</td>
</tr>
</c:forEach>
</c:when>
<c:otherwise>
<tr>
<th><a href="?op=graphs&domain=${report.domain}&date=${model.date}&type=${payload.type}" class="graph_link" data-status="-1">[:: show ::]</a>
<a href="?domain=${model.domain}&date=${model.date}&type=${payload.type}&sort=type"> Name</a></th>
<a href="?domain=${model.domain}&date=${model.date}&type=${payload.type}&sort=type">Name</a></th>
<th><a href="?domain=${model.domain}&date=${model.date}&type=${payload.type}&sort=total">Total Count</a></th>
<th><a href="?domain=${model.domain}&date=${model.date}&type=${payload.type}&sort=failure">Failure Count</a></th>
<th><a href="?domain=${model.domain}&date=${model.date}&type=${payload.type}&sort=failurePercent">Failure%</a></th>
<th>Sample Link</th><th>Min/Max/Avg/Std(ms)</th></tr>
<th>Sample Link</th><th>Min(ms)</th><th>Max(ms)</th><th><a href="?domain=${model.domain}&date=${model.date}&type=${payload.type}&sort=avg">Avg</a>(ms)</th><th>Std(ms)</th></tr>
<tr class="graphs"><td colspan="6"><div id="-1" style="display:none"></div></td></tr>
<c:forEach var="item" items="${model.displayNameReport.results}" varStatus="status">
<c:set var="e" value="${item.detail}"/>
......@@ -57,7 +60,10 @@
<td>${e.failCount}</td>
<td>${w:format(e.failPercent,'0.00')}</td>
<td><a href="${model.logViewBaseUri}/${empty e.failMessageUrl ? e.successMessageUrl : e.failMessageUrl}">Log View</a></td>
<td>${w:format(e.min,'0.#')}/${w:format(e.max,'0.#')}/${w:format(e.avg,'0.0')}/${w:format(e.std,'0.0')}</td>
<td>${w:format(e.min,'0.#')}</td>
<td>${w:format(e.max,'0.#')}</td>
<td>${w:format(e.avg,'0.0')}</td>
<td>${w:format(e.std,'0.0')}</td>
</tr>
<tr class="graphs"><td colspan="6"><div id="${status.index}" style="display:none"></div></td></tr>
</c:forEach>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册