提交 ff88ee35 编写于 作者: Y youyong

Merge branch 'master' of ssh://192.168.8.22:58422/cat

......@@ -16,6 +16,7 @@ import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.consumer.transaction.TransactionReportAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.storage.BucketManager;
......@@ -57,8 +58,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.config(E("reportPath").value("target/report/transaction/")));
all.add(C(TransactionAnalyzer.class).is(PER_LOOKUP) //
.req(MessageManager.class, BucketManager.class) //
.req(MessageStorage.class, "html"));
.req(BucketManager.class, MessagePathBuilder.class));
all.add(C(IpAnalyzer.class).is(PER_LOOKUP));
......
......@@ -113,7 +113,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
@Override
public void initialize() throws InitializationException {
String path = m_pathBuilder.getLogViewPath(new Date(m_startTime));
String path = m_pathBuilder.getMessagePath(new Date(m_startTime));
try {
m_messageBucket = m_bucketManager.getMessageBucket(path);
......@@ -264,7 +264,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
void storeLogviews() {
String path = m_pathBuilder.getLogViewPath(new Date(m_startTime));
String path = m_pathBuilder.getMessagePath(new Date(m_startTime));
Bucket<byte[]> bucket = null;
try {
......
......@@ -95,15 +95,11 @@
<implementation>com.dianping.cat.consumer.transaction.TransactionAnalyzer</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>html</role-hint>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
......
......@@ -97,7 +97,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(Bucket.class, byte[].class.getName(), DefaultBucket.class));
all.add(C(Bucket.class, MessageTree.class.getName(), DefaultMessageBucket.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class));
all.add(C(BucketManager.class, DefaultBucketManager.class) //
.config(E("baseDir").value("target/bucket/")));
return all;
}
......
......@@ -13,7 +13,7 @@ public interface MessagePathBuilder {
public String getLogViewPath(String messageId);
public String getLogViewPath(Date timestamp);
public String getMessagePath(Date timestamp);
public String getReportPath(Date timestamp);
}
......@@ -92,7 +92,7 @@ public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializa
}
@Override
public String getLogViewPath(Date timestamp) {
public String getMessagePath(Date timestamp) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/logview");
return format.format(new Object[] { timestamp });
......
package com.dianping.cat.storage;
import java.io.File;
import java.io.IOException;
import java.util.List;
......@@ -10,7 +11,7 @@ public interface Bucket<T> extends TagThreadSupport<T> {
public T findById(String id);
public void initialize(Class<?> type, String path) throws IOException;
public void initialize(Class<?> type, File path) throws IOException;
public boolean storeById(String id, T data);
......
......@@ -19,14 +19,10 @@ import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.TagThreadSupport;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSupport<T>, LogEnabled {
private static final String[] EMPTY = new String[0];
@Inject
private String m_baseDir;
// key => offset of record
private Map<String, Long> m_idToOffsets = new HashMap<String, Long>();
......@@ -152,10 +148,10 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
}
@Override
public void initialize(Class<?> type, String path) throws IOException {
public void initialize(Class<?> type, File path) throws IOException {
m_writeLock = new ReentrantLock();
m_readLock = new ReentrantLock();
m_file = new File(m_baseDir, path);
m_file = path;
m_file.getParentFile().mkdirs();
m_writeFile = new RandomAccessFile(m_file, "rw");
m_readFile = new RandomAccessFile(m_file, "r");
......@@ -165,6 +161,8 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
}
}
protected abstract boolean isAutoFlush();
protected void loadIndexes() throws IOException {
byte[] data = new byte[8192];
......@@ -207,10 +205,6 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
@Override
public boolean storeById(String id, T data) {
return storeById(id, data, EMPTY);
......@@ -251,9 +245,10 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
m_writeFile.write('\n');
m_writeFile.write(buf.array(), buf.readerIndex(), length);
m_writeFile.write('\n');
// TODO add a flag
m_writeFile.getChannel().force(true);
if (isAutoFlush()) {
m_writeFile.getChannel().force(true);
}
updateIndex(id, tags, offset);
......
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
......@@ -45,7 +46,7 @@ public class DefaultBucket<T> extends AbstractFileBucket<T> {
}
@Override
public void initialize(Class<?> type, String path) throws IOException {
public void initialize(Class<?> type, File path) throws IOException {
super.initialize(type, path);
m_type = type;
......@@ -55,4 +56,9 @@ public class DefaultBucket<T> extends AbstractFileBucket<T> {
"Only String or byte[] are supported so far, but was %s.", m_type));
}
}
@Override
protected boolean isAutoFlush() {
return true;
}
}
package com.dianping.cat.storage.internal;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
......@@ -10,14 +11,18 @@ import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultBucketManager extends ContainerHolder implements BucketManager, Disposable {
@Inject
private String m_baseDir;
private Map<Entry, Bucket<?>> m_map = new HashMap<Entry, Bucket<?>>();
protected Bucket<?> createBucket(String path, Class<?> type) throws IOException {
Bucket<?> bucket = lookup(Bucket.class, type.getName());
bucket.initialize(type, path);
bucket.initialize(type, new File(m_baseDir, path));
return bucket;
}
......@@ -66,6 +71,10 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
return getBucket(String.class, path);
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
static class Entry {
private Class<?> m_type;
......
......@@ -26,6 +26,11 @@ public class DefaultMessageBucket extends AbstractFileBucket<MessageTree> {
m_codec.encode(tree, buf);
}
@Override
protected boolean isAutoFlush() {
return true;
}
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
......
......@@ -208,6 +208,9 @@
<component>
<role>com.dianping.cat.storage.BucketManager</role>
<implementation>com.dianping.cat.storage.internal.DefaultBucketManager</implementation>
<configuration>
<baseDir>target/bucket/</baseDir>
</configuration>
</component>
</components>
</plexus>
package com.dianping.cat.storage;
import java.io.File;
import junit.framework.Assert;
import org.junit.Test;
......@@ -51,7 +53,7 @@ public class BucketTest extends ComponentTestCase {
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(byte[].class, "target/bucket/bytes");
bucket.initialize(byte[].class, new File("target/bucket/bytes"));
// store it and load it
for (int i = 0; i < 100; i++) {
......@@ -97,7 +99,7 @@ public class BucketTest extends ComponentTestCase {
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(MessageTree.class, "target/bucket/message");
bucket.initialize(MessageTree.class, new File("target/bucket/message"));
// check next message in the same thread
for (int i = 0; i < groups - 1; i++) {
......@@ -133,7 +135,7 @@ public class BucketTest extends ComponentTestCase {
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(String.class, "target/bucket/data");
bucket.initialize(String.class, new File("target/bucket/data"));
// store it and load it
for (int i = 0; i < 100; i++) {
......
......@@ -2,7 +2,7 @@ package com.dianping.cat.report.page.transaction;
public enum Action implements com.site.web.mvc.Action {
VIEW("view"),
GRAPHS("graphs"),
GRAPH("graph");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册