From d6a7aa7993dd3f97a44d33709cad82e48e222f9c Mon Sep 17 00:00:00 2001 From: Frankie Wu Date: Sat, 31 Mar 2012 14:07:22 +0800 Subject: [PATCH] local message bucket --- .../build/StorageComponentConfigurator.java | 5 + .../storage/message/LocalLogviewBucket.java | 52 +++---- .../storage/message/LocalMessageBucket.java | 127 ++++++++++++++++++ .../resources/META-INF/plexus/components.xml | 15 +++ 4 files changed, 165 insertions(+), 34 deletions(-) create mode 100644 cat-core/src/main/java/com/dianping/cat/storage/message/LocalMessageBucket.java diff --git a/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java b/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java index 9519ddef7..17be0233c 100644 --- a/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java +++ b/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java @@ -34,6 +34,11 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator { .is(PER_LOOKUP) // .req(MessagePathBuilder.class) // .req(MessageCodec.class, "plain-text")); + all.add(C(Bucket.class, MessageTree.class.getName() + "-message", + com.dianping.cat.storage.message.LocalMessageBucket.class) // + .is(PER_LOOKUP) // + .req(MessagePathBuilder.class) // + .req(MessageCodec.class, "plain-text")); return all; } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java index 52abba392..2b5da42cd 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java @@ -22,10 +22,6 @@ import org.codehaus.plexus.logging.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import com.dianping.cat.Cat; -import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; -import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessageTree; @@ -266,47 +262,35 @@ public class LocalLogviewBucket implements Bucket, LogEnabled { List tags = prepareTags(tree); - MessageProducer cat = Cat.getProducer(); - Transaction t = cat.newTransaction("Bucket", getClass().getSimpleName()); ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); - try { - m_codec.encode(tree, buf); + m_codec.encode(tree, buf); - int length = buf.readInt(); - byte[] num = String.valueOf(length).getBytes("utf-8"); - long offset = m_writeDataFileLength; + int length = buf.readInt(); + byte[] num = String.valueOf(length).getBytes("utf-8"); + long offset = m_writeDataFileLength; - m_writeLock.lock(); + m_writeLock.lock(); - try { - m_writeDataFile.write(num); - m_writeDataFile.write('\n'); - m_writeDataFile.write(buf.array(), buf.readerIndex(), length); - m_writeDataFile.write('\n'); - m_writeDataFileLength += num.length + 1 + length + 1; + try { + m_writeDataFile.write(num); + m_writeDataFile.write('\n'); + m_writeDataFile.write(buf.array(), buf.readerIndex(), length); + m_writeDataFile.write('\n'); + m_writeDataFileLength += num.length + 1 + length + 1; - String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n'; - byte[] data = line.getBytes("utf-8"); + String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n'; + byte[] data = line.getBytes("utf-8"); - m_writeIndexFile.write(data); - m_dirty.set(true); + m_writeIndexFile.write(data); + m_dirty.set(true); - updateIndex(id, offset, tags); + updateIndex(id, offset, tags); - t.setStatus(Message.SUCCESS); - return true; - } catch (Exception e) { - cat.logError(e); - t.setStatus(e); - } finally { - m_writeLock.unlock(); - } + return true; } finally { - t.complete(); + m_writeLock.unlock(); } - - return false; } protected void updateIndex(String id, long offset, List tags) { diff --git a/cat-core/src/main/java/com/dianping/cat/storage/message/LocalMessageBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalMessageBucket.java new file mode 100644 index 000000000..4ab8cab93 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalMessageBucket.java @@ -0,0 +1,127 @@ +package com.dianping.cat.storage.message; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import com.dianping.cat.message.spi.MessageCodec; +import com.dianping.cat.message.spi.MessagePathBuilder; +import com.dianping.cat.message.spi.MessageTree; +import com.dianping.cat.storage.Bucket; +import com.site.lookup.annotation.Inject; + +public class LocalMessageBucket implements Bucket { + @Inject + private MessageCodec m_codec; + + @Inject + private MessagePathBuilder m_pathBuilder; + + @Inject + private String m_baseDir = "target/bucket"; + + private ReentrantLock m_writeLock; + + private OutputStream m_writeDataFile; + + @Override + public void close() throws IOException { + m_writeLock.lock(); + + try { + m_writeDataFile.close(); + } catch (Exception e) { + // ignore it + } finally { + m_writeLock.unlock(); + } + } + + @Override + public void deleteAndCreate() throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public List findAllById(String id) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public MessageTree findById(String id) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public MessageTree findNextById(String id, String tag) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public MessageTree findPreviousById(String id, String tag) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public void flush() throws IOException { + m_writeLock.lock(); + + try { + m_writeDataFile.flush(); + } finally { + m_writeLock.lock(); + } + } + + @Override + public Collection getIdsByPrefix(String tag) { + throw new UnsupportedOperationException("Not supported by local logview bucket!"); + } + + @Override + public void initialize(Class type, String domain, Date timestamp) throws IOException { + m_writeLock = new ReentrantLock(); + + String logicalPath = m_pathBuilder.getMessagePath(domain, timestamp); + File dataFile = new File(m_baseDir, logicalPath); + + dataFile.getParentFile().mkdirs(); + m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile), 8192); + } + + public void setBaseDir(String baseDir) { + m_baseDir = baseDir; + } + + @Override + public boolean storeById(String id, MessageTree tree) throws IOException { + ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); + + m_codec.encode(tree, buf); + + int length = buf.readInt(); + byte[] num = String.valueOf(length).getBytes("utf-8"); + + m_writeLock.lock(); + + try { + m_writeDataFile.write(num); + m_writeDataFile.write('\n'); + m_writeDataFile.write(buf.array(), buf.readerIndex(), length); + m_writeDataFile.write('\n'); + + return true; + } finally { + m_writeLock.unlock(); + } + } +} diff --git a/cat-core/src/main/resources/META-INF/plexus/components.xml b/cat-core/src/main/resources/META-INF/plexus/components.xml index 457f331d4..635f190bf 100644 --- a/cat-core/src/main/resources/META-INF/plexus/components.xml +++ b/cat-core/src/main/resources/META-INF/plexus/components.xml @@ -285,5 +285,20 @@ + + com.dianping.cat.storage.Bucket + com.dianping.cat.message.spi.MessageTree-message + com.dianping.cat.storage.message.LocalMessageBucket + per-lookup + + + com.dianping.cat.message.spi.MessagePathBuilder + + + com.dianping.cat.message.spi.MessageCodec + plain-text + + + -- GitLab