提交 6727ccce 编写于 作者: F Frankie Wu

refactory

上级 ba92e711
......@@ -43,7 +43,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(ProblemAnalyzer.class).is(PER_LOOKUP) //
.req(Handler.class, new String[] { FAILURE.getName(), LONG_URL.getName() }, "m_handlers") //
.req(BucketManager.class, MessagePathBuilder.class));
.req(BucketManager.class));
all.add(C(TransactionAnalyzer.class).is(PER_LOOKUP) //
.req(BucketManager.class, MessagePathBuilder.class));
......
......@@ -22,7 +22,6 @@ import com.dianping.cat.consumer.problem.model.entity.Segment;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlBuilder;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
......@@ -34,9 +33,6 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
@Inject
private BucketManager m_bucketManager;
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private List<Handler> m_handlers;
......@@ -119,21 +115,21 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
void loadReports() {
String path = m_pathBuilder.getReportPath(new Date(m_startTime));
Date timestamp = new Date(m_startTime);
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(path);
bucket = m_bucketManager.getReportBucket(timestamp, "problem");
for (String id : bucket.getIdsByPrefix("problem-")) {
for (String id : bucket.getIdsByPrefix("")) {
String xml = bucket.findById(id);
ProblemReport report = parser.parse(xml);
m_reports.put(report.getDomain(), report);
}
} catch (Exception e) {
m_logger.error(String.format("Error when loading problem reports from %s!", path), e);
m_logger.error(String.format("Error when loading problem reports of %s!", timestamp), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
......@@ -165,8 +161,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
String messageId = tree.getMessageId();
try {
String path = m_pathBuilder.getMessagePath(domain, new Date(m_startTime));
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(path);
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain);
bucket.storeById(messageId, tree);
} catch (IOException e) {
......@@ -205,24 +200,24 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
void storeReports(Collection<ProblemReport> reports) {
String path = m_pathBuilder.getReportPath(new Date(m_startTime));
Bucket<String> bucket = null;
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(path);
bucket = m_bucketManager.getReportBucket(timestamp, "problem");
// delete old one, not append mode
bucket.deleteAndCreate();
for (ProblemReport report : reports) {
String xml = builder.buildXml(report);
String key = "failure-" + report.getDomain();
String domain = report.getDomain();
bucket.storeById(key, xml);
bucket.storeById(domain, xml);
}
} catch (Exception e) {
m_logger.error(String.format("Error when storing transaction reports to %s!", path), e);
m_logger.error(String.format("Error when storing transaction reports of %s!", timestamp), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
......
......@@ -120,21 +120,21 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
void loadReports() {
String path = m_pathBuilder.getReportPath(new Date(m_startTime));
Date timestamp = new Date(m_startTime);
DefaultXmlParser parser = new DefaultXmlParser();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(path);
bucket = m_bucketManager.getReportBucket(timestamp, "transaction");
for (String id : bucket.getIdsByPrefix("transaction-")) {
for (String id : bucket.getIdsByPrefix("")) {
String xml = bucket.findById(id);
TransactionReport report = parser.parse(xml);
m_reports.put(report.getDomain(), report);
}
} catch (Exception e) {
m_logger.error(String.format("Error when loading problem reports from %s!", path), e);
m_logger.error(String.format("Error when loading problem reports of %s!", timestamp), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
......@@ -165,8 +165,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String messageId = tree.getMessageId();
try {
String path = m_pathBuilder.getMessagePath(domain, new Date(m_startTime));
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(path);
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain);
bucket.storeById(messageId, tree);
} catch (IOException e) {
......@@ -279,24 +278,24 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
void storeReports(Collection<TransactionReport> reports) {
String path = m_pathBuilder.getReportPath(new Date(m_startTime));
Bucket<String> bucket = null;
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(path);
bucket = m_bucketManager.getReportBucket(timestamp, "transaction");
// delete old one, not append mode
bucket.deleteAndCreate();
for (TransactionReport report : reports) {
String xml = builder.buildXml(report);
String key = "transaction-" + report.getDomain();
String domain = report.getDomain();
bucket.storeById(key, xml);
bucket.storeById(domain, xml);
}
} catch (Exception e) {
m_logger.error(String.format("Error when storing transaction reports to %s!", path), e);
m_logger.error(String.format("Error when storing transaction reports to %s!", timestamp), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
......
......@@ -51,9 +51,6 @@
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
......@@ -74,18 +71,5 @@
<implementation>com.dianping.cat.consumer.ip.IpAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.consumer.logview.LogViewPostHandler</role>
<implementation>com.dianping.cat.consumer.logview.LogViewPostHandler</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
</components>
</plexus>
......@@ -4,12 +4,13 @@ import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.dianping.cat.storage.internal.LocalStringBucket;
import com.dianping.cat.storage.internal.DefaultBucketManager;
import com.dianping.cat.storage.internal.LocalMessageBucket;
import com.dianping.cat.storage.internal.LocalStringBucket;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -19,12 +20,12 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator {
List<Component> all = new ArrayList<Component>();
all.add(C(Bucket.class, String.class.getName(), LocalStringBucket.class) //
.is(PER_LOOKUP));
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
all.add(C(Bucket.class, MessageTree.class.getName(), LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class) //
.config(E("baseDir").value("target/bucket/")));
all.add(C(BucketManager.class, DefaultBucketManager.class));
return all;
}
......
......@@ -12,5 +12,5 @@ public interface MessagePathBuilder {
public String getMessagePath(String domain, Date timestamp);
public String getReportPath(Date timestamp);
public String getReportPath(String name, Date timestamp);
}
......@@ -64,10 +64,10 @@ public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializa
}
@Override
public String getReportPath(Date timestamp) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/report");
public String getReportPath(String name, Date timestamp) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/report-{1}");
return format.format(new Object[] { timestamp });
return format.format(new Object[] { timestamp, name });
}
@Override
......
package com.dianping.cat.storage;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
public interface Bucket<T> {
public void close() throws IOException;
......@@ -12,14 +12,14 @@ public interface Bucket<T> {
public T findById(String id) throws IOException;
public T findNextById(String id, String tag) throws IOException;
public T findPreviousById(String id, String tag) throws IOException;
public void flush() throws IOException;
public Collection<String> getIdsByPrefix(String prefix);
public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException;;
public void initialize(Class<?> type, String name, Date timestamp) throws IOException;;
public boolean storeById(String id, T data) throws IOException;;
}
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.Date;
import com.dianping.cat.message.spi.MessageTree;
public interface BucketManager {
public void closeBucket(Bucket<?> bucket);
public Bucket<MessageTree> getMessageBucket(String path) throws IOException;
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain) throws IOException;
public Bucket<String> getReportBucket(String path) throws IOException;
public Bucket<String> getReportBucket(Date timestamp, String name) throws IOException;
}
......@@ -6,6 +6,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -20,10 +21,13 @@ import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.storage.Bucket;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
private static final String[] EMPTY = new String[0];
@Inject
private String m_baseDir = "target/bucket";
// key => offset of record
private Map<String, Long> m_idToOffsets = new HashMap<String, Long>();
......@@ -180,11 +184,10 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
}
@Override
public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException {
File path = new File(baseDir, logicalPath);
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
m_writeLock = new ReentrantLock();
m_readLock = new ReentrantLock();
m_file = path;
m_file = new File(m_baseDir, getLogicalPath(timestamp, name));
m_file.getParentFile().mkdirs();
m_writeFile = new RandomAccessFile(m_file, "rw");
m_readFile = new RandomAccessFile(m_file, "r");
......@@ -194,6 +197,8 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
}
}
protected abstract String getLogicalPath(Date timestamp, String name);
protected abstract boolean isAutoFlush();
protected void loadIndexes() throws IOException {
......
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
......@@ -11,28 +11,25 @@ import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultBucketManager extends ContainerHolder implements BucketManager, Disposable {
@Inject
private String m_baseDir;
private Map<Entry, Bucket<?>> m_map = new HashMap<Entry, Bucket<?>>();
@Override
public void closeBucket(Bucket<?> bucket) {
try {
bucket.close();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
// ignore it
}
release(bucket);
}
protected Bucket<?> createBucket(String path, Class<?> type) throws IOException {
protected Bucket<?> createBucket(Class<?> type, Date timestamp, String name) throws IOException {
Bucket<?> bucket = lookup(Bucket.class, type.getName());
bucket.initialize(type, new File(m_baseDir), path);
bucket.initialize(type, name, timestamp);
return bucket;
}
......@@ -44,12 +41,8 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
}
@SuppressWarnings("unchecked")
protected <T> Bucket<T> getBucket(Class<T> type, String path) throws IOException {
if (type == null || path == null) {
throw new IllegalArgumentException(String.format("Type(%s) or path(%s) can't be null.", type, path));
}
Entry entry = new Entry(type, path);
protected <T> Bucket<T> getBucket(Class<T> type, Date timestamp, String name) throws IOException {
Entry entry = new Entry(type, timestamp, name);
Bucket<?> bucket = m_map.get(entry);
if (bucket == null) {
......@@ -57,7 +50,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
bucket = m_map.get(entry);
if (bucket == null) {
bucket = createBucket(path, type);
bucket = createBucket(type, timestamp, name);
m_map.put(entry, bucket);
}
}
......@@ -67,27 +60,26 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
}
@Override
public Bucket<MessageTree> getMessageBucket(String path) throws IOException {
return getBucket(MessageTree.class, path);
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain) throws IOException {
return getBucket(MessageTree.class, timestamp, domain);
}
@Override
public Bucket<String> getReportBucket(String path) throws IOException {
return getBucket(String.class, path);
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
public Bucket<String> getReportBucket(Date timestamp, String name) throws IOException {
return getBucket(String.class, timestamp, name);
}
static class Entry {
private Class<?> m_type;
private String m_path;
private Date m_timestamp;
private String m_name;
public Entry(Class<?> type, String path) {
public Entry(Class<?> type, Date timestamp, String name) {
m_type = type;
m_path = path;
m_timestamp = timestamp;
m_name = name;
}
@Override
......@@ -95,14 +87,19 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
if (obj instanceof Entry) {
Entry e = (Entry) obj;
return e.getType() == m_type && e.getPath().equals(m_path);
return e.getType() == m_type && e.getTimestamp().getTime() == m_timestamp.getTime()
&& e.getName().equals(m_name);
}
return false;
}
public String getPath() {
return m_path;
public String getName() {
return m_name;
}
public Date getTimestamp() {
return m_timestamp;
}
public Class<?> getType() {
......@@ -113,7 +110,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
public int hashCode() {
int hashcode = m_type.hashCode();
hashcode = hashcode * 31 + m_path.hashCode();
hashcode = hashcode * 31 + m_name.hashCode();
return hashcode;
}
}
......
package com.dianping.cat.storage.internal;
import java.io.IOException;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.annotation.Inject;
......@@ -13,6 +15,9 @@ public class LocalMessageBucket extends AbstractFileBucket<MessageTree> {
@Inject
private MessageCodec m_codec;
@Inject
private MessagePathBuilder m_pathBuilder;
@Override
protected MessageTree decode(ChannelBuffer buf) throws IOException {
MessageTree tree = new DefaultMessageTree();
......@@ -43,4 +48,9 @@ public class LocalMessageBucket extends AbstractFileBucket<MessageTree> {
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
@Override
protected String getLogicalPath(Date timestamp, String domain) {
return m_pathBuilder.getMessagePath(domain, timestamp);
}
}
......@@ -2,12 +2,19 @@ package com.dianping.cat.storage.internal;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Date;
import org.jboss.netty.buffer.ChannelBuffer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.site.lookup.annotation.Inject;
public class LocalStringBucket extends AbstractFileBucket<String> {
private static final String[] EMPTY = new String[0];
@Inject
private MessagePathBuilder m_pathBuilder;
@Override
protected String decode(ChannelBuffer buf) throws IOException {
return (String) buf.toString(buf.readerIndex(), buf.readableBytes(), Charset.forName("utf-8"));
......@@ -31,4 +38,9 @@ public class LocalStringBucket extends AbstractFileBucket<String> {
protected boolean isAutoFlush() {
return true;
}
@Override
protected String getLogicalPath(Date timestamp, String name) {
return m_pathBuilder.getReportPath(name, timestamp);
}
}
......@@ -195,19 +195,18 @@
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>[B</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultBucket</implementation>
<implementation>com.dianping.cat.storage.internal.LocalStringBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree</role-hint>
<implementation>com.dianping.cat.storage.internal.DefaultMessageBucket</implementation>
<implementation>com.dianping.cat.storage.internal.LocalMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
......@@ -219,9 +218,6 @@
<component>
<role>com.dianping.cat.storage.BucketManager</role>
<implementation>com.dianping.cat.storage.internal.DefaultBucketManager</implementation>
<configuration>
<baseDir>target/bucket/</baseDir>
</configuration>
</component>
</components>
</plexus>
......@@ -2,6 +2,7 @@ package com.dianping.cat.storage;
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -44,8 +45,9 @@ public class BucketConcurrentTest extends ComponentTestCase {
@Test
public void testMessageBucket() throws Exception {
Date timestamp = new Date();
BucketManager manager = lookup(BucketManager.class);
final Bucket<MessageTree> bucket = manager.getMessageBucket("concurrent/message");
final Bucket<MessageTree> bucket = manager.getMessageBucket(timestamp, "concurrent/message");
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int p = 0; p < 10; p++) {
......@@ -108,8 +110,9 @@ public class BucketConcurrentTest extends ComponentTestCase {
@Test
public void testStringBucket() throws Exception {
Date timestamp = new Date();
BucketManager manager = lookup(BucketManager.class);
final Bucket<String> bucket = manager.getReportBucket("concurrent/data");
final Bucket<String> bucket = manager.getReportBucket(timestamp, "concurrent/data");
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int p = 0; p < 10; p++) {
......@@ -137,7 +140,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
pool.awaitTermination(5000, TimeUnit.MILLISECONDS);
final Bucket<String> bucket2 = manager.getReportBucket("concurrent/data");
final Bucket<String> bucket2 = manager.getReportBucket(timestamp, "concurrent/data");
for (int p = 0; p < 10; p++) {
final int num = p;
......
package com.dianping.cat.storage;
import java.util.Date;
import junit.framework.Assert;
import org.junit.Test;
......@@ -13,11 +15,12 @@ import com.site.lookup.ComponentTestCase;
public class BucketManagerTest extends ComponentTestCase {
@Test
public void test() throws Exception {
Date timestamp = new Date();
BucketManager manager = lookup(BucketManager.class);
Bucket<MessageTree> bucket1 = manager.getMessageBucket("test/path1");
Bucket<MessageTree> bucket2 = manager.getMessageBucket("test/path2");
Bucket<MessageTree> bucket3 = manager.getMessageBucket("test/path1");
Bucket<MessageTree> bucket4 = manager.getMessageBucket("test/path2");
Bucket<MessageTree> bucket1 = manager.getMessageBucket(timestamp, "test/path1");
Bucket<MessageTree> bucket2 = manager.getMessageBucket(timestamp, "test/path2");
Bucket<MessageTree> bucket3 = manager.getMessageBucket(timestamp, "test/path1");
Bucket<MessageTree> bucket4 = manager.getMessageBucket(timestamp, "test/path2");
Assert.assertEquals(bucket1, bucket3);
Assert.assertEquals(bucket2, bucket4);
......
......@@ -3,6 +3,7 @@ package com.dianping.cat.report.build;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.job.DumpToHdfsConsumer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
......@@ -23,7 +24,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(MessageConsumer.class, new String[] { "realtime" }, "m_consumers"));
} else {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime" }, "m_consumers"));
.req(MessageConsumer.class, new String[] { "realtime", DumpToHdfsConsumer.ID }, "m_consumers"));
}
all.add(C(ValueTranslater.class, DefaultValueTranslater.class));
......
......@@ -5,7 +5,6 @@ import java.util.List;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.report.page.model.ip.CompositeIpService;
import com.dianping.cat.report.page.model.ip.LocalIpService;
import com.dianping.cat.report.page.model.logview.CompositeLogViewService;
......@@ -30,14 +29,14 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
all.add(C(ModelService.class, "transaction-local", LocalTransactionService.class) //
.req(MessageConsumer.class, "realtime"));
all.add(C(ModelService.class, "transaction-hdfs", HdfsTransactionService.class) //
.req(BucketManager.class, MessagePathBuilder.class));
.req(BucketManager.class));
all.add(C(ModelService.class, "transaction", CompositeTransactionService.class) //
.req(ModelService.class, new String[] { "transaction-local", "transaction-hdfs" }, "m_services"));
all.add(C(ModelService.class, "problem-local", LocalProblemService.class) //
.req(MessageConsumer.class, "realtime"));
all.add(C(ModelService.class, "problem-hdfs", HdfsProblemService.class) //
.req(BucketManager.class, MessagePathBuilder.class));
.req(BucketManager.class));
all.add(C(ModelService.class, "problem", CompositeProblemService.class) //
.req(ModelService.class, new String[] { "problem-local", "problem-hdfs" }, "m_services"));
......@@ -48,10 +47,10 @@ class ServiceComponentConfigurator extends AbstractResourceConfigurator {
.req(ModelService.class, new String[] { "ip-local" }, "m_services"));
all.add(C(ModelService.class, "logview-local", LocalLogViewService.class) //
.req(MessagePathBuilder.class, BucketManager.class) //
.req(BucketManager.class) //
.req(MessageCodec.class, "html"));
all.add(C(ModelService.class, "logview-hdfs", HdfsLogViewService.class) //
.req(MessagePathBuilder.class, BucketManager.class) //
.req(BucketManager.class) //
.req(MessageCodec.class, "html", "m_htmlCodec") //
.req(MessageCodec.class, "plain-text", "m_plainCodec"));
all.add(C(ModelService.class, "logview", CompositeLogViewService.class) //
......
......@@ -8,7 +8,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
......@@ -18,9 +17,6 @@ import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class HdfsLogViewService implements ModelService<String> {
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private BucketManager m_bucketManager;
......@@ -33,11 +29,10 @@ public class HdfsLogViewService implements ModelService<String> {
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
String path = m_pathBuilder.getMessagePath(id.getDomain(), new Date(id.getTimestamp()));
ModelResponse<String> response = new ModelResponse<String>();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(path);
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain());
MessageTree tree = null;
if (tag != null && direction != null) {
......
......@@ -8,7 +8,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
......@@ -18,9 +17,6 @@ import com.dianping.cat.storage.BucketManager;
import com.site.lookup.annotation.Inject;
public class LocalLogViewService implements ModelService<String> {
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private BucketManager m_bucketManager;
......@@ -33,11 +29,10 @@ public class LocalLogViewService implements ModelService<String> {
String direction = request.getProperty("direction");
String tag = request.getProperty("tag");
MessageId id = MessageId.parse(messageId);
String path = m_pathBuilder.getMessagePath(id.getDomain(), new Date(id.getTimestamp()));
ModelResponse<String> response = new ModelResponse<String>();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(path);
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain());
MessageTree tree = null;
if (tag != null && direction != null) {
......
......@@ -4,7 +4,6 @@ import java.util.Date;
import com.dianping.cat.consumer.problem.model.entity.ProblemReport;
import com.dianping.cat.consumer.problem.model.transform.DefaultXmlParser;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
......@@ -16,19 +15,15 @@ public class HdfsProblemService implements ModelService<ProblemReport> {
@Inject
private BucketManager m_bucketManager;
@Inject
private MessagePathBuilder m_pathBuilder;
@Override
public ModelResponse<ProblemReport> invoke(ModelRequest request) {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
String path = m_pathBuilder.getReportPath(new Date(date));
ModelResponse<ProblemReport> response = new ModelResponse<ProblemReport>();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(path);
bucket = m_bucketManager.getReportBucket(new Date(date), domain);
String xml = bucket.findById("problem-" + domain);
......
......@@ -4,7 +4,6 @@ import java.util.Date;
import com.dianping.cat.consumer.transaction.model.entity.TransactionReport;
import com.dianping.cat.consumer.transaction.model.transform.DefaultXmlParser;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
......@@ -16,21 +15,17 @@ public class HdfsTransactionService implements ModelService<TransactionReport> {
@Inject
private BucketManager m_bucketManager;
@Inject
private MessagePathBuilder m_pathBuilder;
@Override
public ModelResponse<TransactionReport> invoke(ModelRequest request) {
String domain = request.getDomain();
long date = Long.parseLong(request.getProperty("date"));
String path = m_pathBuilder.getReportPath(new Date(date));
ModelResponse<TransactionReport> response = new ModelResponse<TransactionReport>();
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(path);
bucket = m_bucketManager.getReportBucket(new Date(date), domain);
String xml = bucket.findById("transaction-" + domain);
String xml = bucket.findById(domain);
if (xml != null) {
TransactionReport report = new DefaultXmlParser().parse(xml);
......
......@@ -45,9 +45,6 @@
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
......@@ -84,9 +81,6 @@
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
<component>
......@@ -134,9 +128,6 @@
<role-hint>logview-local</role-hint>
<implementation>com.dianping.cat.report.page.model.logview.LocalLogViewService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
......@@ -151,9 +142,6 @@
<role-hint>logview-hdfs</role-hint>
<implementation>com.dianping.cat.report.page.model.logview.HdfsLogViewService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
......@@ -374,9 +362,6 @@
<role>com.dianping.cat.report.page.model.logview.LocalLogViewService</role>
<implementation>com.dianping.cat.report.page.model.logview.LocalLogViewService</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
......
......@@ -5,7 +5,7 @@ import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class HdfsDumpConsumer implements MessageConsumer {
public class DumpToHdfsConsumer implements MessageConsumer {
public static final String ID = "dump-to-hdfs";
@Inject
......
......@@ -3,10 +3,11 @@ package com.dianping.cat.job.build;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.job.HdfsDumpConsumer;
import com.dianping.cat.job.DumpToHdfsConsumer;
import com.dianping.cat.job.hdfs.DefaultOutputChannel;
import com.dianping.cat.job.hdfs.DefaultOutputChannelManager;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.job.hdfs.InputChannelManager;
import com.dianping.cat.job.hdfs.OutputChannel;
import com.dianping.cat.job.hdfs.OutputChannelManager;
import com.dianping.cat.job.sql.dal.LogviewDao;
......@@ -42,12 +43,12 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class) //
.config(E("baseDir").value("data"), //
E("serverUri").value("/catlog")));
E("serverUri").value("hdfs://192.168.7.43:9000/user/cat/")));
}
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
.req(OutputChannelManager.class));
all.add(C(MessageConsumer.class, HdfsDumpConsumer.ID, HdfsDumpConsumer.class) //
all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) //
.req(MessageStorage.class, "hdfs"));
if (isEnv("dev") || property("env", null) == null) {
......@@ -62,7 +63,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(ReportDao.class));
all.add(C(Bucket.class, MessageTree.class.getName(), RemoteMessageBucket.class) //
.is(PER_LOOKUP) //
.req(LogviewDao.class) //
.req(LogviewDao.class, MessagePathBuilder.class) //
.req(OutputChannelManager.class, InputChannelManager.class) //
.req(MessageCodec.class, "plain-text"));
}
......
package com.dianping.cat.job.storage;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.Date;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -15,6 +15,7 @@ import com.dianping.cat.job.hdfs.OutputChannelManager;
import com.dianping.cat.job.sql.dal.Logview;
import com.dianping.cat.job.sql.dal.LogviewDao;
import com.dianping.cat.job.sql.dal.LogviewEntity;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.dal.jdbc.DalException;
......@@ -27,6 +28,9 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
@Inject
private InputChannelManager m_inputChannelManager;
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private LogviewDao m_logviewDao;
......@@ -112,9 +116,11 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
}
@Override
public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException {
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
String ipAddress = InetAddress.getLocalHost().getHostAddress();
String logicalPath = m_pathBuilder.getMessagePath(name, timestamp);
// TODO make it lazy
m_path = logicalPath + "-" + ipAddress + "-" + System.currentTimeMillis();
m_outputChannel = m_outputChannelManager.openChannel(m_path, false);
}
......
package com.dianping.cat.job.storage;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
......@@ -95,14 +92,8 @@ public class RemoteStringBucket implements Bucket<String>, LogEnabled {
}
@Override
public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException {
SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd'/'HH'/report'");
try {
m_period = format.parse(logicalPath);
} catch (ParseException e) {
throw new IOException(String.format("Unable to parse date out of logicalPath(%s)!", logicalPath), e);
}
public void initialize(Class<?> type, String name, Date timestamp) throws IOException {
m_period = timestamp;
}
@Override
......
......@@ -11,9 +11,9 @@
<member name="creation-date" field="creation_date" value-type="Date" nullable="false" />
<var name="key-message-id" value-type="String" key-member="message-id" />
<primary-key name="PRIMARY" members="message_id" />
<index name="mid_thread" members="message_id ASC, tag_thread ASC" />
<index name="mid_session" members="message_id ASC, tag_session ASC" />
<index name="mid_request" members="message_id ASC, tag_request ASC" />
<index name="tag_thread" members="tag_thread ASC" />
<index name="tag_session" members="tag_session ASC" />
<index name="tag_request" members="tag_request ASC" />
<readsets>
<readset name="FULL" all="true" />
</readsets>
......@@ -39,16 +39,16 @@
</query-defs>
</entity>
<entity name="report" table="report" alias="r">
<member name="id" field="id" value-type="int" length="10" nullable="false" key="true" />
<member name="id" field="id" value-type="int" length="10" nullable="false" key="true" auto-increment="true" />
<member name="type" field="type" value-type="int" length="3" nullable="false" />
<member name="name" field="name" value-type="String" length="20" />
<member name="domain" field="domain" value-type="String" length="20" />
<member name="period" field="period" value-type="Date" />
<member name="content" field="content" value-type="String" length="16777215" />
<member name="creation-date" field="creation_date" value-type="Date" />
<member name="name" field="name" value-type="String" length="20" nullable="false" />
<member name="domain" field="domain" value-type="String" length="20" nullable="false" />
<member name="period" field="period" value-type="Date" nullable="false" />
<member name="content" field="content" value-type="String" length="16777215" nullable="false" />
<member name="creation-date" field="creation_date" value-type="Date" nullable="false" />
<var name="key-id" value-type="int" key-member="id" />
<primary-key name="PRIMARY" members="id" />
<index name="period_name" members="period ASC, domain ASC" />
<index name="period_domain_name_type" unique="true" members="period ASC, domain ASC, name ASC, type ASC" />
<readsets>
<readset name="FULL" all="true" />
</readsets>
......
......@@ -11,28 +11,28 @@
<param name="tag-session" />
<param name="tag-request" />
<statement><![CDATA[
SELECT <FIELDS/>
FROM <TABLE/>
WHERE 1 = 1
<IF type='NOT_NULL' field='tag-thread'>
AND <FIELD name='tag-thread'/> = ${tag-thread}
</IF>
<IF type='NOT_NULL' field='tag-session'>
AND <FIELD name='tag-session'/> = ${tag-session}
</IF>
<IF type='NOT_NULL' field='tag-request'>
AND <FIELD name='tag-request'/> = ${tag-request}
</IF>
<IF type='EQ' field='direction' value='true'>
AND <FIELD name='message-id'/> \\> ${message-id}
ORDER BY <FIELD name='message-id'/> ASC
</IF>
<IF type='EQ' field='direction' value='false'>
AND <FIELD name='message-id'/> \\< ${message-id}
ORDER BY <FIELD name='message-id'/> DESC
</IF>
LIMIT 1
]]></statement>
SELECT <FIELDS/>
FROM <TABLE/>
WHERE 1 = 1
<IF type='NOT_NULL' field='tag-thread'>
AND <FIELD name='tag-thread'/> = ${tag-thread}
</IF>
<IF type='NOT_NULL' field='tag-session'>
AND <FIELD name='tag-session'/> = ${tag-session}
</IF>
<IF type='NOT_NULL' field='tag-request'>
AND <FIELD name='tag-request'/> = ${tag-request}
</IF>
<IF type='EQ' field='direction' value='true'>
AND <FIELD name='message-id'/> \\> ${message-id}
ORDER BY <FIELD name='message-id'/> ASC
</IF>
<IF type='EQ' field='direction' value='false'>
AND <FIELD name='message-id'/> \\< ${message-id}
ORDER BY <FIELD name='message-id'/> DESC
</IF>
LIMIT 1
]]></statement>
</query>
<query name="find-by-message-ids" type="SELECT" multiple="true">
<param name="message-ids" />
......@@ -79,7 +79,8 @@
INTO <TABLE/>(<FIELDS/>)
VALUES(<VALUES/>)
ON DUPLICATE KEY
UPDATE <FIELD name='content'/> = ${content}
UPDATE <FIELD name='content'/> = ${content},
<FIELD name='creation-date'/> = NOW()
]]></statement>
</query>
</query-defs>
......
<?xml version="1.0" encoding="UTF-8"?>
<entities do-package="com.dianping.cat.job.sql.dal" gen="true" />
......@@ -23,18 +23,6 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>hdfs</role-hint>
<implementation>com.dianping.cat.job.hdfs.HdfsBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>hdfs-logview</role-hint>
<implementation>com.dianping.cat.job.hdfs.LogviewBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
......@@ -48,7 +36,7 @@
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dump-to-hdfs</role-hint>
<implementation>com.dianping.cat.job.HdfsDumpConsumer</implementation>
<implementation>com.dianping.cat.job.DumpToHdfsConsumer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStorage</role>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册