提交 b9a97b48 编写于 作者: F Frankie Wu

checkpoint and reload works

上级 9424fc62
......@@ -19,6 +19,7 @@ import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.Cat;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageQueue;
......@@ -208,6 +209,8 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_lastAnalyzers.putAll(m_currentAnalyzers);
m_currentAnalyzers.clear();
Cat.setup("realtime-consumer");
for (String name : m_analyzerNames) {
MessageAnalyzer analyzer = m_factory.create(name, start, m_duration, m_extraTime);
MessageQueue queue = lookup(MessageQueue.class);
......@@ -305,10 +308,16 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
}
public void run() {
m_analyzer.analyze(m_queue);
m_factory.release(m_analyzer);
m_factory.release(m_queue);
m_latch.countDown();
Cat.setup("realtime-consumer-task");
try {
m_analyzer.analyze(m_queue);
m_factory.release(m_analyzer);
m_factory.release(m_queue);
m_latch.countDown();
} finally {
Cat.reset();
}
}
}
}
\ No newline at end of file
......@@ -39,7 +39,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.config(E("failureType").value(failureTypes)));
all.add(C(Handler.class, LONG_URL.getName(), LongUrlHandler.class) //
.config(E("threshold").value("10")));
.config(E("threshold").value("1000")));
all.add(C(ProblemAnalyzer.class).is(PER_LOOKUP) //
.req(Handler.class, new String[] { FAILURE.getName(), LONG_URL.getName() }, "m_handlers") //
......
......@@ -10,6 +10,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -46,9 +47,34 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
private long m_duration;
void closeMessageBuckets(Set<String> set) {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
Bucket<MessageTree> localBucket = null;
Bucket<MessageTree> remoteBucket = null;
try {
localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote");
} catch (Exception e) {
m_logger.error(String.format("Error when getting message bucket of %s!", timestamp), e);
} finally {
if (localBucket != null) {
m_bucketManager.closeBucket(localBucket);
}
if (remoteBucket != null) {
m_bucketManager.closeBucket(remoteBucket);
}
}
}
}
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
}
@Override
......@@ -56,6 +82,18 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
m_logger = logger;
}
private Segment findOrCreateSegment(ProblemReport report, MessageTree tree) {
Machine machine = report.findOrCreateMachine(tree.getIpAddress());
JavaThread thread = machine.findOrCreateThread(tree.getThreadId());
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(tree.getMessage().getTimestamp());
int minute = cal.get(Calendar.MINUTE);
Segment segment = thread.findOrCreateSegment(minute);
return segment;
}
@Override
protected List<ProblemReport> generate() {
List<ProblemReport> reports = new ArrayList<ProblemReport>(m_reports.size());
......@@ -120,7 +158,7 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, "problem");
bucket = m_bucketManager.getReportBucket(timestamp, "problem", "local");
for (String id : bucket.getIdsByPrefix("")) {
String xml = bucket.findById(id);
......@@ -161,27 +199,18 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
String messageId = tree.getMessageId();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain);
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
Bucket<MessageTree> remoteBucket = m_bucketManager
.getMessageBucket(new Date(m_startTime), domain, "remote");
bucket.storeById(messageId, tree);
localBucket.storeById(messageId, tree);
remoteBucket.storeById(messageId, tree);
} catch (IOException e) {
m_logger.error("Error when storing message for problem analyzer!", e);
}
}
}
private Segment findOrCreateSegment(ProblemReport report, MessageTree tree) {
Machine machine = report.findOrCreateMachine(tree.getIpAddress());
JavaThread thread = machine.findOrCreateThread(tree.getThreadId());
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(tree.getMessage().getTimestamp());
int minute = cal.get(Calendar.MINUTE);
Segment segment = thread.findOrCreateSegment(minute);
return segment;
}
public void setAnalyzerInfo(long startTime, long duration, long extraTime) {
m_extraTime = extraTime;
m_startTime = startTime;
......@@ -197,30 +226,38 @@ public class ProblemAnalyzer extends AbstractMessageAnalyzer<ProblemReport> impl
}
storeReports(reports);
closeMessageBuckets(m_reports.keySet());
}
void storeReports(Collection<ProblemReport> reports) {
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> bucket = null;
Bucket<String> localBucket = null;
Bucket<String> remoteBucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, "problem");
localBucket = m_bucketManager.getReportBucket(timestamp, "problem", "local");
remoteBucket = m_bucketManager.getReportBucket(timestamp, "problem", "remote");
// delete old one, not append mode
bucket.deleteAndCreate();
localBucket.deleteAndCreate();
for (ProblemReport report : reports) {
String xml = builder.buildXml(report);
String domain = report.getDomain();
bucket.storeById(domain, xml);
localBucket.storeById(domain, xml);
remoteBucket.storeById(domain, xml);
}
} catch (Exception e) {
m_logger.error(String.format("Error when storing transaction reports of %s!", timestamp), e);
m_logger.error(String.format("Error when storing problem reports to %s!", timestamp), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
if (localBucket != null) {
m_bucketManager.closeBucket(localBucket);
}
if (remoteBucket != null) {
m_bucketManager.closeBucket(remoteBucket);
}
}
}
......
......@@ -10,6 +10,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -49,9 +50,34 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
private long m_duration;
void closeMessageBuckets(Set<String> set) {
Date timestamp = new Date(m_startTime);
for (String domain : m_reports.keySet()) {
Bucket<MessageTree> localBucket = null;
Bucket<MessageTree> remoteBucket = null;
try {
localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "local");
remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain, "remote");
} catch (Exception e) {
m_logger.error(String.format("Error when getting message bucket of %s!", timestamp), e);
} finally {
if (localBucket != null) {
m_bucketManager.closeBucket(localBucket);
}
if (remoteBucket != null) {
m_bucketManager.closeBucket(remoteBucket);
}
}
}
}
@Override
public void doCheckpoint() throws IOException {
storeReports(m_reports.values());
closeMessageBuckets(m_reports.keySet());
}
@Override
......@@ -125,7 +151,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, "transaction");
bucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local");
for (String id : bucket.getIdsByPrefix("")) {
String xml = bucket.findById(id);
......@@ -134,7 +160,7 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
m_reports.put(report.getDomain(), report);
}
} catch (Exception e) {
m_logger.error(String.format("Error when loading problem reports of %s!", timestamp), e);
m_logger.error(String.format("Error when loading transacion reports of %s!", timestamp), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
......@@ -165,9 +191,13 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String messageId = tree.getMessageId();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain);
Bucket<MessageTree> localBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain,
"local");
Bucket<MessageTree> remoteBucket = m_bucketManager.getMessageBucket(new Date(m_startTime), domain,
"remote");
bucket.storeById(messageId, tree);
localBucket.storeById(messageId, tree);
remoteBucket.storeById(messageId, tree);
} catch (IOException e) {
m_logger.error("Error when storing message for transaction analyzer!", e);
}
......@@ -275,30 +305,38 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
}
storeReports(reports);
closeMessageBuckets(m_reports.keySet());
}
void storeReports(Collection<TransactionReport> reports) {
Date timestamp = new Date(m_startTime);
DefaultXmlBuilder builder = new DefaultXmlBuilder(true);
Bucket<String> bucket = null;
Bucket<String> localBucket = null;
Bucket<String> remoteBucket = null;
try {
bucket = m_bucketManager.getReportBucket(timestamp, "transaction");
localBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "local");
remoteBucket = m_bucketManager.getReportBucket(timestamp, "transaction", "remote");
// delete old one, not append mode
bucket.deleteAndCreate();
localBucket.deleteAndCreate();
for (TransactionReport report : reports) {
String xml = builder.buildXml(report);
String domain = report.getDomain();
bucket.storeById(domain, xml);
localBucket.storeById(domain, xml);
remoteBucket.storeById(domain, xml);
}
} catch (Exception e) {
m_logger.error(String.format("Error when storing transaction reports to %s!", timestamp), e);
m_logger.error(String.format("Error when storing transaction reports of %s!", timestamp), e);
} finally {
if (bucket != null) {
m_bucketManager.closeBucket(bucket);
if (localBucket != null) {
m_bucketManager.closeBucket(localBucket);
}
if (remoteBucket != null) {
m_bucketManager.closeBucket(remoteBucket);
}
}
}
......
......@@ -32,7 +32,7 @@
<role-hint>long-url</role-hint>
<implementation>com.dianping.cat.consumer.problem.handler.LongUrlHandler</implementation>
<configuration>
<threshold>10</threshold>
<threshold>1000</threshold>
</configuration>
</component>
<component>
......
......@@ -19,14 +19,15 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator {
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(Bucket.class, String.class.getName(), LocalStringBucket.class) //
all.add(C(Bucket.class, String.class.getName() + "-local", LocalStringBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
all.add(C(Bucket.class, MessageTree.class.getName(), LocalMessageBucket.class) //
all.add(C(Bucket.class, MessageTree.class.getName() + "-local", LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class));
all.add(C(BucketManager.class, DefaultBucketManager.class) //
.req(MessagePathBuilder.class));
return all;
}
......
......@@ -50,7 +50,14 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
@Override
public void add(Message message) {
if (Cat.isInitialized()) {
getContext().add(this, message);
Context ctx = m_context.get();
if (ctx != null) {
ctx.add(this, message);
} else if (m_clientConfig.isDevMode()) {
throw new RuntimeException("Cat has not been initialized successfully, "
+ "please call Cal.setup(...) first for each thread.");
}
}
}
......@@ -62,7 +69,14 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
@Override
public void end(Transaction transaction) {
if (Cat.isInitialized()) {
getContext().end(this, transaction);
Context ctx = m_context.get();
if (ctx != null) {
ctx.end(this, transaction);
} else if (m_clientConfig.isDevMode()) {
throw new RuntimeException("Cat has not been initialized successfully, "
+ "please call Cal.setup(...) first for each thread.");
}
}
}
......@@ -81,17 +95,6 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
return m_clientConfig;
}
Context getContext() {
Context ctx = m_context.get();
if (ctx == null) {
throw new RuntimeException("Cat has not been initialized successfully, "
+ "please call Cal.setup(...) first for each thread.");
} else {
return ctx;
}
}
@Override
public Config getServerConfig() {
return m_serverConfig;
......@@ -151,7 +154,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
@Override
public boolean isCatEnabled() {
return m_domain != null && m_domain.isEnabled();
return m_domain != null && m_domain.isEnabled() && m_context.get() != null;
}
String nextMessageId() {
......@@ -166,7 +169,13 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
@Override
public void setup() {
Context ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
Context ctx;
if (m_domain != null) {
ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
} else {
ctx = new Context("Unknown", m_hostName, "");
}
m_context.set(ctx);
}
......@@ -174,7 +183,14 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
@Override
public void start(Transaction transaction) {
if (Cat.isInitialized()) {
getContext().start(this, transaction);
Context ctx = m_context.get();
if (ctx != null) {
ctx.start(this, transaction);
} else if (m_clientConfig.isDevMode()) {
throw new RuntimeException("Cat has not been initialized successfully, "
+ "please call Cal.setup(...) first for each thread.");
}
} else if (m_firstMessage) {
m_firstMessage = false;
m_logger.warn("CAT client is not enabled because it's not initialized yet");
......
......@@ -26,6 +26,7 @@ import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import com.dianping.cat.Cat;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageTree;
......@@ -95,6 +96,8 @@ public class TcpSocketReceiver implements MessageReceiver, LogEnabled {
@Override
public void onMessage(MessageHandler handler) {
Cat.setup("tcp-socket-receiver");
try {
while (true) {
ChannelBuffer buf = m_queue.poll(1, TimeUnit.MILLISECONDS);
......@@ -110,6 +113,8 @@ public class TcpSocketReceiver implements MessageReceiver, LogEnabled {
}
} catch (InterruptedException e) {
// ignore it
} finally {
Cat.reset();
}
ChannelGroupFuture future = m_channelGroup.close();
......
......@@ -21,5 +21,13 @@ public interface Bucket<T> {
public void initialize(Class<?> type, String name, Date timestamp) throws IOException;;
/**
* store the data by id into the bucket.
*
* @param id
* @param data
* @return true means the data was stored in the bucket, otherwise false.
* @throws IOException
*/
public boolean storeById(String id, T data) throws IOException;;
}
......@@ -8,7 +8,7 @@ import com.dianping.cat.message.spi.MessageTree;
public interface BucketManager {
public void closeBucket(Bucket<?> bucket);
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain) throws IOException;
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain, String namespace) throws IOException;
public Bucket<String> getReportBucket(Date timestamp, String name) throws IOException;
public Bucket<String> getReportBucket(Date timestamp, String name, String namespace) throws IOException;
}
......@@ -7,12 +7,17 @@ import java.util.Map;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultBucketManager extends ContainerHolder implements BucketManager, Disposable {
@Inject
private MessagePathBuilder m_pathBuilder;
private Map<Entry, Bucket<?>> m_map = new HashMap<Entry, Bucket<?>>();
@Override
......@@ -23,34 +28,59 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
// ignore it
}
synchronized (m_map) {
for (Map.Entry<Entry, Bucket<?>> e : m_map.entrySet()) {
if (e.getValue() == bucket) {
m_map.remove(e.getKey());
break;
}
}
}
release(bucket);
}
protected Bucket<?> createBucket(Class<?> type, Date timestamp, String name) throws IOException {
Bucket<?> bucket = lookup(Bucket.class, type.getName());
protected Bucket<?> createBucket(Class<?> type, Date timestamp, String name, String namespace) throws IOException {
try {
Bucket<?> bucket = lookup(Bucket.class, type.getName() + "-" + namespace);
bucket.initialize(type, name, timestamp);
return bucket;
bucket.initialize(type, name, timestamp);
return bucket;
} catch (RuntimeException e) {
e.printStackTrace();
throw e;
}
}
@Override
public void dispose() {
for (Bucket<?> bucket : m_map.values()) {
release(bucket);
synchronized (m_map) {
for (Bucket<?> bucket : m_map.values()) {
release(bucket);
}
}
}
@SuppressWarnings("unchecked")
protected <T> Bucket<T> getBucket(Class<T> type, Date timestamp, String name) throws IOException {
Entry entry = new Entry(type, timestamp, name);
protected <T> Bucket<T> getBucket(Class<T> type, Date timestamp, String name, String namespace) throws IOException {
String path;
if (type == MessageTree.class) {
path = m_pathBuilder.getMessagePath(name, timestamp);
} else {
path = m_pathBuilder.getReportPath(name, timestamp);
}
Entry entry = new Entry(type, path, namespace);
Bucket<?> bucket = m_map.get(entry);
if (bucket == null) {
synchronized (this) {
synchronized (m_map) {
bucket = m_map.get(entry);
if (bucket == null) {
bucket = createBucket(type, timestamp, name);
bucket = createBucket(type, timestamp, name, namespace);
m_map.put(entry, bucket);
}
}
......@@ -60,26 +90,26 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
}
@Override
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain) throws IOException {
return getBucket(MessageTree.class, timestamp, domain);
public Bucket<MessageTree> getMessageBucket(Date timestamp, String domain, String namespace) throws IOException {
return getBucket(MessageTree.class, timestamp, domain, namespace);
}
@Override
public Bucket<String> getReportBucket(Date timestamp, String name) throws IOException {
return getBucket(String.class, timestamp, name);
public Bucket<String> getReportBucket(Date timestamp, String name, String namespace) throws IOException {
return getBucket(String.class, timestamp, name, namespace);
}
static class Entry {
private Class<?> m_type;
private Date m_timestamp;
private String m_path;
private String m_name;
private String m_namespace;
public Entry(Class<?> type, Date timestamp, String name) {
public Entry(Class<?> type, String path, String namespace) {
m_type = type;
m_timestamp = timestamp;
m_name = name;
m_path = path;
m_namespace = namespace;
}
@Override
......@@ -87,31 +117,23 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
if (obj instanceof Entry) {
Entry e = (Entry) obj;
return e.getType() == m_type && e.getTimestamp().getTime() == m_timestamp.getTime()
&& e.getName().equals(m_name);
return e.m_type == m_type && e.m_path.equals(m_path) && e.m_namespace.equals(m_namespace);
}
return false;
}
public String getName() {
return m_name;
}
public Date getTimestamp() {
return m_timestamp;
}
public Class<?> getType() {
return m_type;
}
@Override
public int hashCode() {
int hashcode = m_type.hashCode();
hashcode = hashcode * 31 + m_name.hashCode();
hashcode = hashcode * 31 + m_path.hashCode();
return hashcode;
}
@Override
public String toString() {
return String.format("Entry[type=%s,path=%s]", m_type, m_path);
}
}
}
......@@ -3,6 +3,7 @@
<entity name="config" root="true">
<attribute name="mode" value-type="String" />
<attribute name="enabled" value-type="boolean" />
<attribute name="dev-mode" value-type="boolean" />
<entity-ref name="server" type="list" names="servers" xml-indent="true" />
<entity-ref name="domain" type="list" names="domains" />
<entity-ref name="bind" />
......
......@@ -194,7 +194,7 @@
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String</role-hint>
<role-hint>java.lang.String-local</role-hint>
<implementation>com.dianping.cat.storage.internal.LocalStringBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
......@@ -205,7 +205,7 @@
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree</role-hint>
<role-hint>com.dianping.cat.message.spi.MessageTree-local</role-hint>
<implementation>com.dianping.cat.storage.internal.LocalMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
......@@ -221,6 +221,11 @@
<component>
<role>com.dianping.cat.storage.BucketManager</role>
<implementation>com.dianping.cat.storage.internal.DefaultBucketManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
</components>
</plexus>
......@@ -47,7 +47,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
public void testMessageBucket() throws Exception {
Date timestamp = new Date();
BucketManager manager = lookup(BucketManager.class);
final Bucket<MessageTree> bucket = manager.getMessageBucket(timestamp, "concurrent/message");
final Bucket<MessageTree> bucket = manager.getMessageBucket(timestamp, "concurrent/message", "local");
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int p = 0; p < 10; p++) {
......@@ -112,7 +112,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
public void testStringBucket() throws Exception {
Date timestamp = new Date();
BucketManager manager = lookup(BucketManager.class);
final Bucket<String> bucket = manager.getReportBucket(timestamp, "concurrent/data");
final Bucket<String> bucket = manager.getReportBucket(timestamp, "concurrent/data", "local");
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int p = 0; p < 10; p++) {
......@@ -140,7 +140,7 @@ public class BucketConcurrentTest extends ComponentTestCase {
pool.awaitTermination(5000, TimeUnit.MILLISECONDS);
final Bucket<String> bucket2 = manager.getReportBucket(timestamp, "concurrent/data");
final Bucket<String> bucket2 = manager.getReportBucket(timestamp, "concurrent/data", "local");
for (int p = 0; p < 10; p++) {
final int num = p;
......
......@@ -17,10 +17,10 @@ public class BucketManagerTest extends ComponentTestCase {
public void test() throws Exception {
Date timestamp = new Date();
BucketManager manager = lookup(BucketManager.class);
Bucket<MessageTree> bucket1 = manager.getMessageBucket(timestamp, "test/path1");
Bucket<MessageTree> bucket2 = manager.getMessageBucket(timestamp, "test/path2");
Bucket<MessageTree> bucket3 = manager.getMessageBucket(timestamp, "test/path1");
Bucket<MessageTree> bucket4 = manager.getMessageBucket(timestamp, "test/path2");
Bucket<MessageTree> bucket1 = manager.getMessageBucket(timestamp, "test/path1", "local");
Bucket<MessageTree> bucket2 = manager.getMessageBucket(timestamp, "test/path2", "local");
Bucket<MessageTree> bucket3 = manager.getMessageBucket(timestamp, "test/path1", "local");
Bucket<MessageTree> bucket4 = manager.getMessageBucket(timestamp, "test/path2", "local");
Assert.assertEquals(bucket1, bucket3);
Assert.assertEquals(bucket2, bucket4);
......
<config mode="client" enabled="true" xmlns:xsi="http://www.w3.org/2001/XMLSchema" xsi:noNamespaceSchemaLocation="config.xsd">
<config mode="client" enabled="true" dev-mode="false" xmlns:xsi="http://www.w3.org/2001/XMLSchema" xsi:noNamespaceSchemaLocation="config.xsd">
<servers>
<server ip="192.168.8.21" port="2280" enabled="true"/>
<server ip="192.168.8.22" port="2281" enabled="false"/>
......
......@@ -32,7 +32,7 @@ public class HdfsLogViewService implements ModelService<String> {
ModelResponse<String> response = new ModelResponse<String>();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain());
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain(), "remote");
MessageTree tree = null;
if (tag != null && direction != null) {
......
......@@ -32,7 +32,7 @@ public class LocalLogViewService implements ModelService<String> {
ModelResponse<String> response = new ModelResponse<String>();
try {
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain());
Bucket<MessageTree> bucket = m_bucketManager.getMessageBucket(new Date(id.getTimestamp()), id.getDomain(), "local");
MessageTree tree = null;
if (tag != null && direction != null) {
......@@ -58,6 +58,7 @@ public class LocalLogViewService implements ModelService<String> {
response.setModel(buf.toString(Charset.forName("utf-8")));
}
} catch (Exception e) {
e.printStackTrace();
response.setException(e);
}
......
......@@ -23,7 +23,7 @@ public class HdfsProblemService implements ModelService<ProblemReport> {
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(new Date(date), domain);
bucket = m_bucketManager.getReportBucket(new Date(date), domain, "remote");
String xml = bucket.findById("problem-" + domain);
......
......@@ -23,7 +23,7 @@ public class HdfsTransactionService implements ModelService<TransactionReport> {
Bucket<String> bucket = null;
try {
bucket = m_bucketManager.getReportBucket(new Date(date), domain);
bucket = m_bucketManager.getReportBucket(new Date(date), domain, "remote");
String xml = bucket.findById(domain);
......
......@@ -4,9 +4,12 @@ import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.job.DumpToHdfsConsumer;
import com.dianping.cat.job.hdfs.DefaultInputChannel;
import com.dianping.cat.job.hdfs.DefaultInputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannel;
import com.dianping.cat.job.hdfs.DefaultOutputChannelManager;
import com.dianping.cat.job.hdfs.HdfsMessageStorage;
import com.dianping.cat.job.hdfs.InputChannel;
import com.dianping.cat.job.hdfs.InputChannelManager;
import com.dianping.cat.job.hdfs.OutputChannel;
import com.dianping.cat.job.hdfs.OutputChannelManager;
......@@ -20,8 +23,6 @@ import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.internal.LocalMessageBucket;
import com.dianping.cat.storage.internal.LocalStringBucket;
import com.site.lookup.configuration.AbstractResourceConfigurator;
import com.site.lookup.configuration.Component;
......@@ -36,6 +37,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.config(E("maxSize").value(String.valueOf(2 * 1024 * 1024L))));
all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class));
all.add(C(InputChannel.class, DefaultInputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
all.add(C(InputChannelManager.class, DefaultInputChannelManager.class));
} else {
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") //
......@@ -44,6 +48,10 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.req(MessagePathBuilder.class) //
.config(E("baseDir").value("data"), //
E("serverUri").value("hdfs://192.168.7.43:9000/user/cat/")));
all.add(C(InputChannel.class, DefaultInputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
all.add(C(InputChannelManager.class, DefaultInputChannelManager.class) //
.config(E("serverUri").value("hdfs://192.168.7.43:9000/user/cat/")));
}
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
......@@ -51,22 +59,13 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) //
.req(MessageStorage.class, "hdfs"));
if (isEnv("dev") || property("env", null) == null) {
all.add(C(Bucket.class, String.class.getName(), LocalStringBucket.class) //
.is(PER_LOOKUP));
all.add(C(Bucket.class, MessageTree.class.getName(), LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text"));
} else {
all.add(C(Bucket.class, String.class.getName(), RemoteStringBucket.class) //
.is(PER_LOOKUP) //
.req(ReportDao.class));
all.add(C(Bucket.class, MessageTree.class.getName(), RemoteMessageBucket.class) //
.is(PER_LOOKUP) //
.req(LogviewDao.class, MessagePathBuilder.class) //
.req(OutputChannelManager.class, InputChannelManager.class) //
.req(MessageCodec.class, "plain-text"));
}
all.add(C(Bucket.class, String.class.getName() + "-remote", RemoteStringBucket.class) //
.is(PER_LOOKUP) //
.req(ReportDao.class));
all.add(C(Bucket.class, MessageTree.class.getName() + "-remote", RemoteMessageBucket.class) //
.is(PER_LOOKUP) //
.req(OutputChannelManager.class, InputChannelManager.class) //
.req(LogviewDao.class, MessagePathBuilder.class));
all.addAll(new DatabaseConfigurator().defineComponents());
......
......@@ -24,6 +24,9 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
@Inject
private URI m_serverUri;
@Inject
private String m_baseDir = "target/hdfs";
private FileSystem m_fs;
private Map<String, DefaultInputChannel> m_channels = new HashMap<String, DefaultInputChannel>();
......@@ -94,7 +97,7 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
DefaultInputChannel channel = m_channels.get(path);
if (channel == null) {
Path file = new Path(path);
Path file = new Path(m_baseDir, path);
FSDataInputStream in = m_fs.open(file);
channel = (DefaultInputChannel) lookup(InputChannel.class);
......@@ -106,6 +109,10 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
return channel;
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
public void setServerUri(String serverUri) {
m_serverUri = URI.create(serverUri);
}
......
......@@ -76,6 +76,7 @@ public class DefaultOutputChannel implements OutputChannel {
// a blank line used to separate two message trees
m_out.write('\n');
m_out.flush();
m_count += length + 1;
return length+1;
......
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -9,6 +8,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.plexus.logging.LogEnabled;
......@@ -21,7 +21,8 @@ import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, LogEnabled {
public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable,
LogEnabled {
@Inject
private MessagePathBuilder m_builder;
......@@ -104,16 +105,16 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp
@Override
public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException {
String path = m_builder.getHdfsPath(tree.getMessageId());
return openChannel(path, forceNew);
}
}
public OutputChannel openChannel(String path, boolean forceNew) throws IOException {
OutputChannel channel = m_channels.get(path);
if (channel == null) {
Path file = new Path(m_basePath, path + "-0");
OutputStream out = m_fs.create(file);
Path file = new Path(m_basePath, path);
FSDataOutputStream out = m_fs.create(file);
channel = lookup(OutputChannel.class);
channel.initialize(out);
......@@ -127,7 +128,7 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp
m_indexes.put(path, ++index);
Path file = new Path(m_basePath, path + "-" + index);
OutputStream out = m_fs.create(file);
FSDataOutputStream out = m_fs.create(file);
channel = lookup(OutputChannel.class);
channel.initialize(out);
......
......@@ -4,6 +4,9 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -38,6 +41,15 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
private String m_path;
private Map<String, String> m_lruCache = new LinkedHashMap<String, String>(100, 0.75f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Entry<String, String> eldest) {
return size() > 100;
}
};
private Logger m_logger;
@Override
......@@ -147,34 +159,32 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
@Override
public boolean storeById(String id, MessageTree tree) throws IOException {
// check if it's already stored
try {
m_logviewDao.findByPK(id, LogviewEntity.READSET_FULL);
String messageId = tree.getMessageId();
if (m_lruCache.containsKey(messageId)) {
return false;
} catch (DalException e) {
// not exist
}
m_lruCache.put(messageId, messageId);
int offset = m_outputChannel.getSize();
int length = m_outputChannel.write(tree);
Logview logview = m_logviewDao.createLocal();
logview.setMessageId(tree.getMessageId());
logview.setMessageId(messageId);
logview.setDataPath(m_path);
logview.setDataOffset(offset);
logview.setDataLength(length);
logview.setTagThread("t:" + tree.getThreadId());
logview.setTagSession("s:" + tree.getSessionToken());
logview.setTagRequest("r:" + tree.getMessageId());
logview.setTagRequest("r:" + messageId);
try {
m_logviewDao.insert(logview);
return true;
} catch (DalException e) {
throw new IOException("Error when inserting into logiew table!", e);
throw new IOException("Error when inserting into logview table!", e);
}
}
}
......@@ -23,6 +23,21 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.job.hdfs.InputChannel</role>
<implementation>com.dianping.cat.job.hdfs.DefaultInputChannel</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.job.hdfs.InputChannelManager</role>
<implementation>com.dianping.cat.job.hdfs.DefaultInputChannelManager</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>hdfs</role-hint>
......@@ -46,19 +61,32 @@
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String</role-hint>
<implementation>com.dianping.cat.storage.internal.LocalStringBucket</implementation>
<role-hint>java.lang.String-remote</role-hint>
<implementation>com.dianping.cat.job.storage.RemoteStringBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.job.sql.dal.ReportDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree</role-hint>
<implementation>com.dianping.cat.storage.internal.LocalMessageBucket</implementation>
<role-hint>com.dianping.cat.message.spi.MessageTree-remote</role-hint>
<implementation>com.dianping.cat.job.storage.RemoteMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.job.hdfs.InputChannelManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.job.sql.dal.LogviewDao</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
......
......@@ -53,7 +53,7 @@
<dependency>
<groupId>com.site.common</groupId>
<artifactId>web-framework</artifactId>
<version>1.0.4</version>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>org.unidal.webres</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册