提交 099d5503 编写于 作者: F Frankie Wu

refactory

上级 23856a15
......@@ -55,7 +55,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
Bucket<MessageTree> logviewBucket = null;
try {
logviewBucket = m_bucketManager.getLogviewBucket(new Date(m_startTime), domain);
logviewBucket = m_bucketManager.getLogviewBucket(m_startTime, domain);
} catch (Exception e) {
m_logger.error(String.format("Error when getting logview bucket of %s!", timestamp), e);
} finally {
......@@ -115,21 +115,20 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
void loadReports() {
Date timestamp = new Date(m_startTime);
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> reportBucket = null;
try {
reportBucket = m_bucketManager.getReportBucket(timestamp, "event");
reportBucket = m_bucketManager.getReportBucket(m_startTime, "event");
for (String id : reportBucket.getIdsByPrefix("")) {
for (String id : reportBucket.getIds()) {
String xml = reportBucket.findById(id);
EventReport report = parser.parse(xml);
m_reports.put(report.getDomain(), report);
}
} catch (Exception e) {
m_logger.error(String.format("Error when loading event reports of %s!", timestamp), e);
m_logger.error(String.format("Error when loading event reports of %s!", new Date(m_startTime)), e);
} finally {
if (reportBucket != null) {
m_bucketManager.closeBucket(reportBucket);
......@@ -258,7 +257,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
String domain = tree.getDomain();
try {
Bucket<MessageTree> logviewBucket = m_bucketManager.getLogviewBucket(new Date(m_startTime), domain);
Bucket<MessageTree> logviewBucket = m_bucketManager.getLogviewBucket(m_startTime, domain);
logviewBucket.storeById(messageId, tree);
} catch (IOException e) {
......@@ -267,13 +266,12 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
}
void storeReports(Collection<EventReport> reports) {
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> reportBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
try {
reportBucket = m_bucketManager.getReportBucket(timestamp, "event");
reportBucket = m_bucketManager.getReportBucket(m_startTime, "event");
for (EventReport report : reports) {
String xml = builder.buildXml(report);
......@@ -286,7 +284,7 @@ public class EventAnalyzer extends AbstractMessageAnalyzer<EventReport> implemen
} catch (Exception e) {
Cat.getProducer().logError(e);
t.setStatus(e);
m_logger.error(String.format("Error when storing event reports of %s!", timestamp), e);
m_logger.error(String.format("Error when storing event reports of %s!", new Date(m_startTime)), e);
} finally {
t.complete();
......
......@@ -55,7 +55,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
Bucket<MessageTree> logviewBucket = null;
try {
logviewBucket = m_bucketManager.getLogviewBucket(new Date(m_startTime), domain);
logviewBucket = m_bucketManager.getLogviewBucket(m_startTime, domain);
} catch (Exception e) {
m_logger.error(String.format("Error when getting logview bucket of %s!", timestamp), e);
} finally {
......@@ -129,21 +129,20 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
void loadReports() {
Date timestamp = new Date(m_startTime);
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, "problem");
bucket = m_bucketManager.getReportBucket(m_startTime, "problem");
for (String id : bucket.getIdsByPrefix("")) {
for (String id : bucket.getIds()) {
String xml = bucket.findById(id);
ProblemReport report = parser.parse(xml);
m_reports.put(report.getDomain(), report);
}
} catch (Exception e) {
m_logger.error(String.format("Error when loading problem reports of %s!", timestamp), e);
m_logger.error(String.format("Error when loading problem reports of %s!", new Date(m_startTime)), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
......@@ -199,7 +198,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
String domain = tree.getDomain();
try {
Bucket<MessageTree> logviewBucket = m_bucketManager.getLogviewBucket(new Date(m_startTime), domain);
Bucket<MessageTree> logviewBucket = m_bucketManager.getLogviewBucket(m_startTime, domain);
logviewBucket.storeById(messageId, tree);
} catch (Exception e) {
......@@ -208,16 +207,12 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
void storeReports(Collection<ProblemReport> reports) {
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> reportBucket = null;
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
try {
reportBucket = m_bucketManager.getReportBucket(timestamp, "problem");
// delete old one, not append mode
reportBucket.deleteAndCreate();
reportBucket = m_bucketManager.getReportBucket(m_startTime, "problem");
for (ProblemReport report : reports) {
String xml = builder.buildXml(report);
......@@ -230,7 +225,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
} catch (Exception e) {
Cat.getProducer().logError(e);
t.setStatus(e);
m_logger.error(String.format("Error when storing problem reports to %s!", timestamp), e);
m_logger.error(String.format("Error when storing problem reports to %s!", new Date(m_startTime)), e);
} finally {
t.complete();
......
......@@ -49,15 +49,13 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
private long m_duration;
void closeMessageBuckets() {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
Bucket<MessageTree> logviewBucket = null;
try {
logviewBucket = m_bucketManager.getLogviewBucket(timestamp, domain);
logviewBucket = m_bucketManager.getLogviewBucket(m_startTime, domain);
} catch (Exception e) {
m_logger.error(String.format("Error when getting logview bucket of %s!", timestamp), e);
m_logger.error(String.format("Error when getting logview bucket of %s!", new Date(m_startTime)), e);
} finally {
if (logviewBucket != null) {
m_bucketManager.closeBucket(logviewBucket);
......@@ -116,21 +114,20 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
void loadReports() {
Date timestamp = new Date(m_startTime);
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, "transaction");
bucket = m_bucketManager.getReportBucket(m_startTime, "transaction");
for (String id : bucket.getIdsByPrefix("")) {
for (String id : bucket.getIds()) {
String xml = bucket.findById(id);
TransactionReport report = parser.parse(xml);
m_reports.put(report.getDomain(), report);
}
} catch (Exception e) {
m_logger.error(String.format("Error when loading transacion reports of %s!", timestamp), e);
m_logger.error(String.format("Error when loading transacion reports of %s!", new Date(m_startTime)), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
......@@ -271,7 +268,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String domain = tree.getDomain();
try {
Bucket<MessageTree> logviewBucket = m_bucketManager.getLogviewBucket(new Date(m_startTime), domain);
Bucket<MessageTree> logviewBucket = m_bucketManager.getLogviewBucket(m_startTime, domain);
logviewBucket.storeById(messageId, tree);
} catch (IOException e) {
......@@ -280,13 +277,12 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
void storeReports(Collection<TransactionReport> reports) {
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Transaction t = Cat.getProducer().newTransaction("Checkpoint", getClass().getSimpleName());
Bucket<String> reportBucket = null;
try {
reportBucket = m_bucketManager.getReportBucket(timestamp, "transaction");
reportBucket = m_bucketManager.getReportBucket(m_startTime, "transaction");
for (TransactionReport report : reports) {
String xml = builder.buildXml(report);
......@@ -299,7 +295,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
} catch (Exception e) {
Cat.getProducer().logError(e);
t.setStatus(e);
m_logger.error(String.format("Error when storing transaction reports of %s!", timestamp), e);
m_logger.error(String.format("Error when storing transaction reports of %s!", new Date(m_startTime)), e);
} finally {
t.complete();
......
......@@ -8,10 +8,10 @@ import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.dianping.cat.storage.internal.DefaultBucketManager;
import com.dianping.cat.storage.internal.LocalMessageBucket;
import com.dianping.cat.storage.internal.LocalStringBucket;
import com.dianping.cat.storage.DefaultBucketManager;
import com.dianping.cat.storage.message.LocalLogviewBucket;
import com.dianping.cat.storage.message.LocalMessageBucket;
import com.dianping.cat.storage.report.LocalReportBucket;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -20,29 +20,20 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(Bucket.class, String.class.getName() + "-local", LocalStringBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
all.add(C(Bucket.class, MessageTree.class.getName() + "-local", LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class) //
.req(MessagePathBuilder.class));
.req(MessagePathBuilder.class));
all.add(C(Bucket.class, MessageTree.class.getName() + "-logview", LocalLogviewBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, MessageTree.class.getName() + "-message",//
com.dianping.cat.storage.message.LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, String.class.getName() + "-report",//
com.dianping.cat.storage.report.LocalReportBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, MessageTree.class.getName() + "-message", LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, String.class.getName() + "-report", LocalReportBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
return all;
}
......
......@@ -46,6 +46,7 @@ public class LocalIP {
}
public static String getAddress() {
return address;
return NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
// return address;
}
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
......@@ -46,7 +47,11 @@ public enum NetworkInterfaceManager {
}
public String getLocalHostName() {
return m_local.getCanonicalHostName();
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
return m_local.getHostName();
}
}
public String getLocalHostAddress() {
......
package com.dianping.cat.message.internal;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Stack;
......@@ -10,7 +8,7 @@ import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.LocalIP;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.configuration.client.entity.ClientConfig;
import com.dianping.cat.configuration.client.entity.Domain;
import com.dianping.cat.message.Message;
......@@ -154,14 +152,10 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
m_domain = firstDomain == null ? new Domain("unknown").setEnabled(false) : firstDomain;
try {
m_hostName = InetAddress.getLocalHost().getHostName();
m_hostName = NetworkInterfaceManager.INSTANCE.getLocalHostName();
if (m_domain.getIp() == null) {
m_domain.setIp(LocalIP.getAddress());
}
} catch (UnknownHostException e) {
m_logger.warn("Unable to get local host!", e);
if (m_domain.getIp() == null) {
m_domain.setIp(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
}
// initialize milli-second resolution level timer
......
......@@ -78,12 +78,15 @@ public class CatFilter implements Filter {
StringBuilder sb = new StringBuilder(1024);
String ip = "";
String ipForwarded = req.getHeader("x-forwarded-for");
if (ipForwarded == null) {
ip = req.getRemoteAddr();
} else {
String ips[] = ipForwarded.split(",");
ip = ips[ips.length - 1].trim();
}
sb.append("RemoteIp=").append(ip);
sb.append("&VirtualIP=").append(req.getRemoteAddr());
sb.append("&Server=").append(req.getServerName());
......
......@@ -3,23 +3,66 @@ package com.dianping.cat.storage;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
public interface Bucket<T> {
/**
* Close bucket and release component instance
*
* @throws IOException
*/
public void close() throws IOException;
public void deleteAndCreate() throws IOException;
/**
* Find data by given id in the bucket. return null if not found.
*
* @param id
* @return data for given id, null if not found
* @throws IOException
*/
public T findById(String id) throws IOException;
/**
* Find next id with same tag in the bucket. return null if not found.
*
* @param id
* @param tag
* @return next data for given id with tag, null if not found.
* @throws IOException
*/
public T findNextById(String id, String tag) throws IOException;
/**
* Find previous id with same tag in the bucket. return null if not found.
*
* @param id
* @param tag
* @return previous data for given id with tag, null if not found.
* @throws IOException
*/
public T findPreviousById(String id, String tag) throws IOException;
/**
* Flush the buffered data in the bucket if have.
*
* @throws IOException
*/
public void flush() throws IOException;
public Collection<String> getIdsByPrefix(String prefix);
/**
* Return all ids in the bucket.
*
* @return
*/
public Collection<String> getIds();
/**
* Initialize the bucket after its creation.
*
* @param type
* @param name
* @param timestamp
* @throws IOException
*/
public void initialize(Class<?> type, String name, Date timestamp) throws IOException;
/**
......@@ -31,6 +74,4 @@ public interface Bucket<T> {
* @throws IOException
*/
public boolean storeById(String id, T data) throws IOException;
List<String> findAllById(String id) throws IOException;
}
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.Date;
import com.dianping.cat.message.spi.MessageTree;
public interface BucketManager {
public void closeBucket(Bucket<?> bucket);
public Bucket<MessageTree> getLogviewBucket(Date timestamp, String domain) throws IOException;
public Bucket<MessageTree> getLogviewBucket(long timestamp, String domain) throws IOException;
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain) throws IOException;
public Bucket<MessageTree> getMessageBucket(long timestamp, String domain) throws IOException;
public Bucket<String> getReportBucket(Date timestamp, String name) throws IOException;
public Bucket<String> getReportBucket(long timestamp, String name) throws IOException;
}
package com.dianping.cat.storage.internal;
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.Date;
......@@ -9,8 +9,6 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
......@@ -63,13 +61,14 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
}
@SuppressWarnings("unchecked")
protected <T> Bucket<T> getBucket(Class<T> type, Date timestamp, String name, String namespace) throws IOException {
protected <T> Bucket<T> getBucket(Class<T> type, long timestamp, String name, String namespace) throws IOException {
String path;
Date date = new Date(timestamp);
if (type == MessageTree.class) {
path = m_pathBuilder.getMessagePath(name, timestamp);
path = m_pathBuilder.getMessagePath(name, date);
} else {
path = m_pathBuilder.getReportPath(name, timestamp);
path = m_pathBuilder.getReportPath(name, date);
}
Entry entry = new Entry(type, path, namespace);
......@@ -80,7 +79,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
bucket = m_map.get(entry);
if (bucket == null) {
bucket = createBucket(type, timestamp, name, namespace);
bucket = createBucket(type, date, name, namespace);
m_map.put(entry, bucket);
}
}
......@@ -90,17 +89,17 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
}
@Override
public Bucket<MessageTree> getLogviewBucket(Date timestamp, String domain) throws IOException {
public Bucket<MessageTree> getLogviewBucket(long timestamp, String domain) throws IOException {
return getBucket(MessageTree.class, timestamp, domain, "logview");
}
@Override
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain) throws IOException {
public Bucket<MessageTree> getMessageBucket(long timestamp, String domain) throws IOException {
return getBucket(MessageTree.class, timestamp, domain, "message");
}
@Override
public Bucket<String> getReportBucket(Date timestamp, String name) throws IOException {
public Bucket<String> getReportBucket(long timestamp, String name) throws IOException {
return getBucket(String.class, timestamp, name, "report");
}
......
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.storage.Bucket;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
private static final String[] EMPTY = new String[0];
@Inject
private String m_baseDir = "target/bucket";
// key => offset of record
private Map<String, Long> m_idToOffsets = new HashMap<String, Long>();
// tag => list of ids
private Map<String, List<String>> m_tagToIds = new HashMap<String, List<String>>();
private File m_file;
private RandomAccessFile m_readFile;
private RandomAccessFile m_writeFile;
private ReentrantLock m_readLock;
private ReentrantLock m_writeLock;
private Logger m_logger;
@Override
public void close() {
m_writeLock.lock();
try {
m_idToOffsets.clear();
m_tagToIds.clear();
m_writeFile.close();
} catch (IOException e) {
// ignore it
} finally {
m_writeLock.unlock();
}
}
protected abstract T decode(ChannelBuffer buf) throws IOException;
@Override
public void deleteAndCreate() {
m_writeLock.lock();
m_readLock.lock();
m_idToOffsets.clear();
m_tagToIds.clear();
try {
m_file.delete();
m_writeFile = new RandomAccessFile(m_file, "rw");
m_readFile = new RandomAccessFile(m_file, "r");
} catch (FileNotFoundException e) {
m_logger.error(String.format("Error when clearing file bucket(%s)!", m_file), e);
} finally {
m_readLock.unlock();
m_writeLock.unlock();
}
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public boolean storeById(String id, T data) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public List<String> findAllById(String id) throws IOException {
throw new UnsupportedOperationException();
}
protected abstract void encode(T data, ChannelBuffer buf) throws IOException;
@Override
public T findById(String id) {
Long offset = m_idToOffsets.get(id);
if (offset != null) {
m_readLock.lock();
try {
long old = m_readFile.getFilePointer();
m_readFile.seek(offset);
m_readFile.readLine(); // first line is header, get rid of it
int num = Integer.parseInt(m_readFile.readLine());
byte[] bytes = new byte[num];
m_readFile.readFully(bytes);
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bytes);
T data = decode(buf);
m_readFile.seek(old);
return data;
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_file), e);
} finally {
m_readLock.unlock();
}
}
return null;
}
@Override
public T findNextById(String id, String tag) throws IOException {
List<String> ids = m_tagToIds.get(tag);
if (ids != null) {
int index = ids.indexOf(id);
index++;
if (index >= 0 && index < ids.size()) {
String nextId = ids.get(index);
return findById(nextId);
}
}
return null;
}
@Override
public T findPreviousById(String id, String tag) throws IOException {
List<String> ids = m_tagToIds.get(tag);
if (ids != null) {
int index = ids.indexOf(id);
index--;
if (index >= 0 && index < ids.size()) {
String nextId = ids.get(index);
return findById(nextId);
}
}
return null;
}
public String[] findTagsById(String id) {
// TODO
return EMPTY;
}
@Override
public void flush() throws IOException {
m_writeLock.lock();
try {
m_writeFile.getChannel().force(true);
} finally {
m_writeLock.lock();
}
}
public Set<String> getIds() {
return m_idToOffsets.keySet();
}
@Override
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
m_writeLock = new ReentrantLock();
m_readLock = new ReentrantLock();
m_file = new File(m_baseDir, getLogicalPath(timestamp, name));
m_file.getParentFile().mkdirs();
m_writeFile = new RandomAccessFile(m_file, "rw");
m_readFile = new RandomAccessFile(m_file, "r");
if (m_file.exists()) {
loadIndexes();
}
}
protected abstract String getLogicalPath(Date timestamp, String name);
protected abstract boolean isAutoFlush();
protected void loadIndexes() throws IOException {
byte[] data = new byte[8192];
m_writeLock.lock();
try {
while (true) {
long offset = m_writeFile.getFilePointer();
String first = m_writeFile.readLine();
if (first == null) { // EOF
break;
}
int num = -1;
// if the index was corrupted, then try to skip some lines
try {
num = Integer.parseInt(m_writeFile.readLine());
} catch (NumberFormatException e) {
m_logger.warn("Error during loadIndexes: " + e.getMessage());
}
if (num > data.length) {
int newSize = data.length;
while (newSize < num) {
newSize += newSize / 2;
}
data = new byte[newSize];
}
m_writeFile.readFully(data, 0, num); // get rid of it
m_writeFile.readLine(); // get rid of empty line
List<String> parts = Splitters.by('\t').split(first);
if (parts.size() > 0) {
String id = parts.get(0);
parts.remove(0);
updateIndex(id, parts.toArray(EMPTY), offset);
}
}
} finally {
m_writeLock.unlock();
}
}
/**
* Store the message in the format of:<br>
*
* <xmp> <id>\t<tag1>\t<tag2>\t...\n <length of message>\n <message>\n </xmp>
*/
protected boolean storeById(String id, T data, String... tags) {
if (m_idToOffsets.containsKey(id)) {
return false;
}
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("Bucket", getClass().getSimpleName());
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n";
byte[] firstLine;
byte[] num;
int length;
t.setStatus(Message.SUCCESS);
try {
try {
encode(data, buf);
length = buf.readInt();
firstLine = attributes.getBytes("utf-8");
num = String.valueOf(length).getBytes("utf-8");
} catch (IOException e) {
m_logger.error(String.format("Error when preparing to write to file(%s)!", m_file), e);
t.setStatus(e);
return false;
}
m_writeLock.lock();
try {
long offset = m_writeFile.getFilePointer();
m_writeFile.write(firstLine);
m_writeFile.write(num);
m_writeFile.write('\n');
m_writeFile.write(buf.array(), buf.readerIndex(), length);
m_writeFile.write('\n');
if (isAutoFlush()) {
m_writeFile.getChannel().force(true);
}
updateIndex(id, tags, offset);
return true;
} catch (Exception e) {
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
t.setStatus(e);
return false;
} finally {
m_writeLock.unlock();
}
} finally {
t.complete();
}
}
@Override
public Collection<String> getIdsByPrefix(String prefix) {
List<String> ids = new ArrayList<String>();
for (String id : m_idToOffsets.keySet()) {
if (id.startsWith(prefix)) {
ids.add(id);
}
}
return ids;
}
@Override
public String toString() {
return String.format("%s[file=%s, ids=%s]", getClass().getSimpleName(), m_file, m_idToOffsets.keySet());
}
protected void updateIndex(String id, String[] tags, long offset) {
m_idToOffsets.put(id, offset);
for (String tag : tags) {
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
ids = new ArrayList<String>();
m_tagToIds.put(tag, ids);
}
if (!ids.contains(id)) {
ids.add(id);
}
}
}
}
package com.dianping.cat.storage.internal;
import java.io.IOException;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class LocalMessageBucket extends AbstractFileBucket<MessageTree> {
@Inject
private MessageCodec m_codec;
@Inject
private MessagePathBuilder m_pathBuilder;
@Override
protected MessageTree decode(ChannelBuffer buf) throws IOException {
MessageTree tree = m_codec.decode(buf);
return tree;
}
@Override
protected void encode(MessageTree tree, ChannelBuffer buf) throws IOException {
m_codec.encode(tree, buf);
}
@Override
protected boolean isAutoFlush() {
return true;
}
@Override
public boolean storeById(String id, MessageTree tree) {
String tagThread = "t:" + tree.getThreadId();
String tagSession = "s:" + tree.getSessionToken();
String tagRequest = "r:" + tree.getMessageId();
return storeById(id, tree, new String[] { tagThread, tagSession, tagRequest });
}
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
@Override
protected String getLogicalPath(Date timestamp, String domain) {
return m_pathBuilder.getMessagePath(domain, timestamp);
}
}
package com.dianping.cat.storage.internal;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.site.lookup.annotation.Inject;
public class LocalStringBucket extends AbstractFileBucket<String> {
private static final String[] EMPTY = new String[0];
@Inject
private MessagePathBuilder m_pathBuilder;
@Override
protected String decode(ChannelBuffer buf) throws IOException {
return (String) buf.toString(buf.readerIndex(), buf.readableBytes(), Charset.forName("utf-8"));
}
@Override
protected void encode(String data, ChannelBuffer buf) throws IOException {
String str = (String) data;
byte[] bytes = str.getBytes("utf-8");
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
}
@Override
public boolean storeById(String id, String data) {
return storeById(id, data, EMPTY);
}
@Override
protected boolean isAutoFlush() {
return true;
}
@Override
protected String getLogicalPath(Date timestamp, String name) {
return m_pathBuilder.getReportPath(name, timestamp);
}
}
......@@ -26,7 +26,6 @@ import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.helper.Splitters.StringSplitter;
import com.site.lookup.annotation.Inject;
......@@ -74,29 +73,16 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
m_tagToIds.clear();
m_writeDataFile.close();
m_writeIndexFile.close();
} catch (Exception e) {
// ignore it
} finally {
m_writeLock.unlock();
}
}
@Override
public void deleteAndCreate() throws IOException {
new File(m_baseDir, m_logicalPath).delete();
new File(m_baseDir, m_logicalPath + ".idx").delete();
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public List<String> findAllById(String id) throws IOException {
throw new UnsupportedOperationException("Not supported by local logview bucket!");
}
@Override
public MessageTree findById(String id) throws IOException {
Long offset = m_idToOffsets.get(id);
......@@ -182,10 +168,14 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
}
@Override
public Collection<String> getIdsByPrefix(String tag) {
public Collection<String> getIds() {
throw new UnsupportedOperationException("Not supported by local logview bucket!");
}
public String getLogicalPath() {
return m_logicalPath;
}
@Override
public void initialize(Class<?> type, String domain, Date timestamp) throws IOException {
m_writeLock = new ReentrantLock();
......@@ -224,12 +214,13 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
List<String> parts = splitter.split(line);
if (parts.size() >= 2) {
String id = parts.remove(0);
String offset = parts.remove(0);
if (parts.size() >= 3) {
String id = parts.get(0);
String offset = parts.get(1);
String tag = parts.get(2);
try {
updateIndex(id, Long.parseLong(offset), parts);
updateIndex(id, Long.parseLong(offset), tag);
} catch (NumberFormatException e) {
// ignore it
}
......@@ -240,14 +231,6 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
}
}
protected List<String> prepareTags(MessageTree tree) {
List<String> tags = new ArrayList<String>(1);
tags.add("t:" + tree.getThreadId());
return tags;
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
......@@ -258,7 +241,6 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
return false;
}
List<String> tags = prepareTags(tree);
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
......@@ -275,14 +257,15 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
m_writeDataFile.write('\n');
long offset = m_writeDataFileLength;
String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n';
String tag = "t:" + tree.getThreadId();
String line = id + '\t' + offset + '\t' + tag + '\n';
byte[] data = line.getBytes("utf-8");
m_writeDataFileLength += num.length + 1 + length + 1;
m_writeIndexFile.write(data);
m_dirty.set(true);
updateIndex(id, offset, tags);
updateIndex(id, offset, tag);
return true;
} finally {
......@@ -290,21 +273,19 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
}
}
protected void updateIndex(String id, long offset, List<String> tags) {
protected void updateIndex(String id, long offset, String tag) {
m_idToOffsets.put(id, offset);
for (String tag : tags) {
List<String> ids = m_tagToIds.get(tag);
List<String> ids = m_tagToIds.get(tag);
if (ids == null) {
ids = new ArrayList<String>(3);
if (ids == null) {
ids = new ArrayList<String>(3);
m_tagToIds.put(tag, ids);
}
m_tagToIds.put(tag, ids);
}
if (!ids.contains(id)) {
ids.add(id);
}
if (!ids.contains(id)) {
ids.add(id);
}
}
}
......@@ -7,7 +7,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.buffer.ChannelBuffer;
......@@ -33,29 +32,19 @@ public class LocalMessageBucket implements Bucket<MessageTree> {
private OutputStream m_writeDataFile;
private String m_logicalPath;
@Override
public void close() throws IOException {
m_writeLock.lock();
try {
m_writeDataFile.close();
} catch (Exception e) {
// ignore it
} finally {
m_writeLock.unlock();
}
}
@Override
public void deleteAndCreate() throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
}
@Override
public List<String> findAllById(String id) throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
}
@Override
public MessageTree findById(String id) throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
......@@ -83,10 +72,14 @@ public class LocalMessageBucket implements Bucket<MessageTree> {
}
@Override
public Collection<String> getIdsByPrefix(String tag) {
public Collection<String> getIds() {
throw new UnsupportedOperationException("Not supported by local logview bucket!");
}
public String getLogicalPath() {
return m_logicalPath;
}
@Override
public void initialize(Class<?> type, String domain, Date timestamp) throws IOException {
m_writeLock = new ReentrantLock();
......@@ -95,6 +88,8 @@ public class LocalMessageBucket implements Bucket<MessageTree> {
File dataFile = new File(m_baseDir, logicalPath);
dataFile.getParentFile().mkdirs();
m_logicalPath = logicalPath;
m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile), 8192);
}
......
......@@ -62,32 +62,16 @@ public class LocalReportBucket implements Bucket<String>, LogEnabled {
m_tagToIds.clear();
m_writeDataFile.close();
m_writeIndexFile.close();
} catch (Exception e) {
// ignore it
} finally {
m_writeLock.unlock();
}
}
@Override
public void deleteAndCreate() throws IOException {
File dataFile = new File(m_baseDir, m_logicalPath);
File indexFile = new File(m_baseDir, m_logicalPath + ".idx");
dataFile.delete();
indexFile.delete();
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public List<String> findAllById(String id) throws IOException {
throw new UnsupportedOperationException("Not supported by local logview bucket!");
}
@Override
public String findById(String id) throws IOException {
Long offset = m_idToOffsets.get(id);
......@@ -154,13 +138,25 @@ public class LocalReportBucket implements Bucket<String>, LogEnabled {
@Override
public void flush() throws IOException {
m_writeLock.lock();
try {
m_writeDataFile.flush();
m_writeIndexFile.flush();
} finally {
m_writeLock.unlock();
}
}
@Override
public Collection<String> getIdsByPrefix(String tag) {
public Collection<String> getIds() {
return m_idToOffsets.keySet();
}
public String getLogicalPath() {
return m_logicalPath;
}
@Override
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
m_writeLock = new ReentrantLock();
......@@ -171,16 +167,16 @@ public class LocalReportBucket implements Bucket<String>, LogEnabled {
File dataFile = new File(m_baseDir, logicalPath);
File indexFile = new File(m_baseDir, logicalPath + ".idx");
if (indexFile.exists()) {
loadIndexes(indexFile);
}
dataFile.getParentFile().mkdirs();
m_logicalPath = logicalPath;
m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile), 8192);
m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile), 8192);
m_readDataFile = new RandomAccessFile(dataFile, "r");
if (indexFile.exists()) {
loadIndexes(indexFile);
}
}
protected void loadIndexes(File indexFile) throws IOException {
......
......@@ -235,35 +235,9 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String-local</role-hint>
<implementation>com.dianping.cat.storage.internal.LocalStringBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree-local</role-hint>
<implementation>com.dianping.cat.storage.internal.LocalMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.BucketManager</role>
<implementation>com.dianping.cat.storage.internal.DefaultBucketManager</implementation>
<implementation>com.dianping.cat.storage.DefaultBucketManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
......
......@@ -2,7 +2,6 @@ package com.dianping.cat.storage;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -48,7 +47,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
@Test
public void testMessageBucket() throws Exception {
Date timestamp = new Date();
long timestamp = System.currentTimeMillis();
BucketManager manager = lookup(BucketManager.class);
final Bucket<MessageTree> bucket = manager.getLogviewBucket(timestamp, "concurrent/message");
ExecutorService pool = Executors.newFixedThreadPool(10);
......@@ -113,7 +112,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
@Test
public void testStringBucket() throws Exception {
Date timestamp = new Date();
long timestamp = System.currentTimeMillis();
BucketManager manager = lookup(BucketManager.class);
final Bucket<String> bucket = manager.getReportBucket(timestamp, "concurrent/data");
ExecutorService pool = Executors.newFixedThreadPool(10);
......
package com.dianping.cat.storage;
import java.util.Date;
import junit.framework.Assert;
import org.junit.Test;
......@@ -15,7 +13,7 @@ import com.site.lookup.ComponentTestCase;
public class BucketManagerTest extends ComponentTestCase {
@Test
public void test() throws Exception {
Date timestamp = new Date();
long timestamp = System.currentTimeMillis();
BucketManager manager = lookup(BucketManager.class);
Bucket<MessageTree> bucket1 = manager.getLogviewBucket(timestamp, "test/path1");
Bucket<MessageTree> bucket2 = manager.getLogviewBucket(timestamp, "test/path2");
......
......@@ -76,7 +76,6 @@ public abstract class MesageTreeBucketTestCase extends ComponentTestCase {
public void tearDown() throws Exception {
super.tearDown();
bucket.close();
bucket.deleteAndCreate();
}
@Test
......
......@@ -74,7 +74,6 @@ public abstract class StringBucketTestCase extends ComponentTestCase {
public void tearDown() throws Exception {
super.tearDown();
bucket.close();
bucket.deleteAndCreate();
}
@Test
......
......@@ -67,20 +67,11 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
m_outputChannelManager.closeChannel(m_outputChannel);
}
@Override
public void deleteAndCreate() throws IOException {
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public List<String> findAllById(String id) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public MessageTree findById(String id) throws IOException {
try {
......@@ -138,7 +129,7 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
}
@Override
public Collection<String> getIdsByPrefix(String prefix) {
public Collection<String> getIds() {
throw new UnsupportedOperationException();
}
......
......@@ -35,10 +35,6 @@ public class RemoteStringBucket implements Bucket<String>, LogEnabled {
public void close() throws IOException {
}
@Override
public void deleteAndCreate() throws IOException {
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
......@@ -49,10 +45,9 @@ public class RemoteStringBucket implements Bucket<String>, LogEnabled {
throw new UnsupportedOperationException();
}
@Override
public List<String> findAllById(String domain) throws IOException {
try {
List<Report> reports = m_reportDao.findByPeriodDomainTypeName(m_period, domain, 1, m_name, ReportEntity.READSET_FULL);
List<Report> reports = m_reportDao.findAllByPeriodDomainTypeName(m_period, domain, 1, m_name, ReportEntity.READSET_FULL);
List<String> contents = new ArrayList<String>(reports.size());
for (Report r : reports) {
contents.add(r.getContent());
......@@ -78,7 +73,9 @@ public class RemoteStringBucket implements Bucket<String>, LogEnabled {
}
@Override
public Collection<String> getIdsByPrefix(String name) {
public Collection<String> getIds() {
String name = "";
try {
List<Report> reports = m_reportDao.findAllByPeriodTypeName(m_period, 1, name, ReportEntity.READSET_FULL);
List<String> ids = new ArrayList<String>(reports.size());
......
......@@ -34,7 +34,7 @@
LIMIT 1
]]></statement>
</query>
<query name="find-by-message-ids" type="SELECT" multiple="true">
<query name="find-all-by-message-ids" type="SELECT" multiple="true">
<param name="message-ids" />
<statement><![CDATA[
SELECT <FIELDS/>
......@@ -42,7 +42,7 @@
WHERE <FIELD name='message-id'/> <IN> ${message-ids} </IN>
]]></statement>
</query>
<query name="find-by-message-id" type="SELECT" multiple="false">
<query name="find-by-message-id" type="SELECT">
<param name="message-id" />
<statement><![CDATA[
SELECT <FIELDS/>
......@@ -62,7 +62,7 @@
<entity name="report" table="report" alias="r">
<member name="creation-date" insert-expr="now()" />
<query-defs>
<query name="find-by-period-domain-type-name" type="SELECT" multiple="true">
<query name="find-all-by-period-domain-type-name" type="SELECT" multiple="true">
<param name="period" />
<param name="domain" />
<param name="type" />
......
......@@ -5,15 +5,16 @@ import java.util.List;
import com.dianping.cat.consumer.event.model.entity.EventReport;
import com.dianping.cat.consumer.event.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.dal.ReportEntity;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HistoricalEventService extends BaseHistoricalModelService<EventReport> {
@Inject
private BucketManager m_bucketManager;
private ReportDao m_reportDao;
public HistoricalEventService() {
super("event");
......@@ -23,32 +24,21 @@ public class HistoricalEventService extends BaseHistoricalModelService<EventRepo
protected EventReport buildModel(ModelRequest request) throws Exception {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
Bucket<String> bucket = null;
List<Report> reports = m_reportDao.findAllByPeriodDomainTypeName(new Date(date), domain, 1, getName(),
ReportEntity.READSET_FULL);
EventReportMerger merger = null;
try {
bucket = m_bucketManager.getReportBucket(new Date(date), getName());
for (Report report : reports) {
String xml = report.getContent();
EventReport model = new DefaultXmlParser().parse(xml);
List<String> xmls = bucket.findAllById(domain);
EventReportMerger merger = null;
if (xmls != null) {
for (String xml : xmls) {
EventReport model = new DefaultXmlParser().parse(xml);
if (merger == null) {
merger = new EventReportMerger(model);
} else {
model.accept(merger);
}
}
return merger.getEventReport();
if (merger == null) {
merger = new EventReportMerger(model);
} else {
return null;
}
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
model.accept(merger);
}
}
return merger.getEventReport();
}
}
package com.dianping.cat.report.page.model.logview;
import java.nio.charset.Charset;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
......@@ -31,8 +30,7 @@ public class HistoricalLogViewService extends BaseHistoricalModelService<String>
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
Date timestamp = new Date(id.getTimestamp());
Bucket<MessageTree> bucket = m_bucketManager.getLogviewBucket(timestamp, id.getDomain());
Bucket<MessageTree> bucket = m_bucketManager.getLogviewBucket(id.getTimestamp(), id.getDomain());
MessageTree tree = null;
if (tag != null && direction != null) {
......
package com.dianping.cat.report.page.model.logview;
import java.nio.charset.Charset;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
......@@ -33,8 +32,7 @@ public class LocalLogViewService extends BaseLocalModelService<String> {
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
Date timestamp = new Date(id.getTimestamp());
Bucket<MessageTree> bucket = m_bucketManager.getLogviewBucket(timestamp, id.getDomain());
Bucket<MessageTree> bucket = m_bucketManager.getLogviewBucket(id.getTimestamp(), id.getDomain());
MessageTree tree = null;
if (tag != null && direction != null) {
......
......@@ -6,15 +6,16 @@ import java.util.List;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.transform.DefaultMerger;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.dal.ReportEntity;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HistoricalProblemService extends BaseHistoricalModelService<ProblemReport> {
@Inject
private BucketManager m_bucketManager;
private ReportDao m_reportDao;
public HistoricalProblemService() {
super("problem");
......@@ -24,32 +25,21 @@ public class HistoricalProblemService extends BaseHistoricalModelService<Problem
protected ProblemReport buildModel(ModelRequest request) throws Exception {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
Bucket<String> bucket = null;
List<Report> reports = m_reportDao.findAllByPeriodDomainTypeName(new Date(date), domain, 1, getName(),
ReportEntity.READSET_FULL);
DefaultMerger merger = null;
try {
bucket = m_bucketManager.getReportBucket(new Date(date), getName());
for (Report report : reports) {
String xml = report.getContent();
ProblemReport model = new DefaultXmlParser().parse(xml);
List<String> xmls = bucket.findAllById(domain);
DefaultMerger merger = null;
if (xmls != null) {
for (String xml : xmls) {
ProblemReport model = new DefaultXmlParser().parse(xml);
if (merger == null) {
merger = new DefaultMerger(model);
} else {
model.accept(merger);
}
}
return merger.getProblemReport();
if (merger == null) {
merger = new DefaultMerger(model);
} else {
return null;
}
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
model.accept(merger);
}
}
return merger.getProblemReport();
}
}
package com.dianping.cat.report.page.model.spi.internal;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
......@@ -13,7 +11,7 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.LocalIP;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.report.page.model.spi.ModelRequest;
......@@ -126,15 +124,8 @@ public abstract class BaseCompositeModelService<T> extends ModelServiceWithCalSu
*/
public void setRemoteServers(String servers) {
List<String> endpoints = Splitters.by(',').noEmptyItem().trim().split(servers);
String localAddress = null;
String localHost = null;
try {
localAddress = LocalIP.getAddress();
localHost = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
// ignore it
}
String localAddress = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
String localHost = NetworkInterfaceManager.INSTANCE.getLocalHostName();
for (String endpoint : endpoints) {
int pos = endpoint.indexOf(':');
......
......@@ -5,15 +5,16 @@ import java.util.List;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.hadoop.dal.Report;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.dal.ReportEntity;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.internal.BaseHistoricalModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HistoricalTransactionService extends BaseHistoricalModelService<TransactionReport> {
@Inject
private BucketManager m_bucketManager;
private ReportDao m_reportDao;
public HistoricalTransactionService() {
super("transaction");
......@@ -23,32 +24,21 @@ public class HistoricalTransactionService extends BaseHistoricalModelService<Tra
protected TransactionReport buildModel(ModelRequest request) throws Exception {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
Bucket<String> bucket = null;
List<Report> reports = m_reportDao.findAllByPeriodDomainTypeName(new Date(date), domain, 1, getName(),
ReportEntity.READSET_FULL);
TransactionReportMerger merger = null;
try {
bucket = m_bucketManager.getReportBucket(new Date(date), getName());
for (Report report : reports) {
String xml = report.getContent();
TransactionReport model = new DefaultXmlParser().parse(xml);
List<String> xmls = bucket.findAllById(domain);
TransactionReportMerger merger = null;
if (xmls != null) {
for (String xml : xmls) {
TransactionReport model = new DefaultXmlParser().parse(xml);
if (merger == null) {
merger = new TransactionReportMerger(model);
} else {
model.accept(merger);
}
}
return merger.getTransactionReport();
if (merger == null) {
merger = new TransactionReportMerger(model);
} else {
return null;
}
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
model.accept(merger);
}
}
return merger.getTransactionReport();
}
}
......@@ -18,6 +18,7 @@ Welcome to <b>Central Application Tracking (CAT)</b>.
<br>
<br>
<br>
<a href="?op=checkpoint" style="color:#FFF">Do checkpoint here</a>
<br>
<br>
<br>
......@@ -26,25 +27,4 @@ Welcome to <b>Central Application Tracking (CAT)</b>.
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<br>
<a href="?op=checkpoint" style="color:#FFF">Do checkpoint here</a>
</a:body>
\ No newline at end of file
......@@ -59,7 +59,7 @@
<dependency>
<groupId>com.site.common</groupId>
<artifactId>web-framework</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>org.unidal.webres</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册