diff --git a/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java b/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java index 9519ddef731c7a424af432175607532e398f18c6..17be0233c4c841d58b2c05f0d03fd058e619f812 100644 --- a/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java +++ b/cat-core/src/main/java/com/dianping/cat/build/StorageComponentConfigurator.java @@ -34,6 +34,11 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator { .is(PER_LOOKUP) // .req(MessagePathBuilder.class) // .req(MessageCodec.class, "plain-text")); + all.add(C(Bucket.class, MessageTree.class.getName() + "-message", + com.dianping.cat.storage.message.LocalMessageBucket.class) // + .is(PER_LOOKUP) // + .req(MessagePathBuilder.class) // + .req(MessageCodec.class, "plain-text")); return all; } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java index 52abba39277fa5812b5648a031a64eb74110fd74..2b5da42cd4077461f8353539993d94cd3ee1f447 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalLogviewBucket.java @@ -22,10 +22,6 @@ import org.codehaus.plexus.logging.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import com.dianping.cat.Cat; -import com.dianping.cat.message.Message; -import com.dianping.cat.message.MessageProducer; -import com.dianping.cat.message.Transaction; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessageTree; @@ -266,47 +262,35 @@ public class LocalLogviewBucket implements Bucket, LogEnabled { List tags = prepareTags(tree); - MessageProducer cat = Cat.getProducer(); - Transaction t = cat.newTransaction("Bucket", getClass().getSimpleName()); ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); - try { - m_codec.encode(tree, buf); + m_codec.encode(tree, buf); - int length = buf.readInt(); - byte[] num = String.valueOf(length).getBytes("utf-8"); - long offset = m_writeDataFileLength; + int length = buf.readInt(); + byte[] num = String.valueOf(length).getBytes("utf-8"); + long offset = m_writeDataFileLength; - m_writeLock.lock(); + m_writeLock.lock(); - try { - m_writeDataFile.write(num); - m_writeDataFile.write('\n'); - m_writeDataFile.write(buf.array(), buf.readerIndex(), length); - m_writeDataFile.write('\n'); - m_writeDataFileLength += num.length + 1 + length + 1; + try { + m_writeDataFile.write(num); + m_writeDataFile.write('\n'); + m_writeDataFile.write(buf.array(), buf.readerIndex(), length); + m_writeDataFile.write('\n'); + m_writeDataFileLength += num.length + 1 + length + 1; - String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n'; - byte[] data = line.getBytes("utf-8"); + String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n'; + byte[] data = line.getBytes("utf-8"); - m_writeIndexFile.write(data); - m_dirty.set(true); + m_writeIndexFile.write(data); + m_dirty.set(true); - updateIndex(id, offset, tags); + updateIndex(id, offset, tags); - t.setStatus(Message.SUCCESS); - return true; - } catch (Exception e) { - cat.logError(e); - t.setStatus(e); - } finally { - m_writeLock.unlock(); - } + return true; } finally { - t.complete(); + m_writeLock.unlock(); } - - return false; } protected void updateIndex(String id, long offset, List tags) { diff --git a/cat-core/src/main/java/com/dianping/cat/storage/message/LocalMessageBucket.java b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalMessageBucket.java new file mode 100644 index 0000000000000000000000000000000000000000..4ab8cab93333a804cfffbe51fd24ac6b8345ed02 --- /dev/null +++ b/cat-core/src/main/java/com/dianping/cat/storage/message/LocalMessageBucket.java @@ -0,0 +1,127 @@ +package com.dianping.cat.storage.message; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import com.dianping.cat.message.spi.MessageCodec; +import com.dianping.cat.message.spi.MessagePathBuilder; +import com.dianping.cat.message.spi.MessageTree; +import com.dianping.cat.storage.Bucket; +import com.site.lookup.annotation.Inject; + +public class LocalMessageBucket implements Bucket { + @Inject + private MessageCodec m_codec; + + @Inject + private MessagePathBuilder m_pathBuilder; + + @Inject + private String m_baseDir = "target/bucket"; + + private ReentrantLock m_writeLock; + + private OutputStream m_writeDataFile; + + @Override + public void close() throws IOException { + m_writeLock.lock(); + + try { + m_writeDataFile.close(); + } catch (Exception e) { + // ignore it + } finally { + m_writeLock.unlock(); + } + } + + @Override + public void deleteAndCreate() throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public List findAllById(String id) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public MessageTree findById(String id) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public MessageTree findNextById(String id, String tag) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public MessageTree findPreviousById(String id, String tag) throws IOException { + throw new UnsupportedOperationException("Not supported by local message bucket!"); + } + + @Override + public void flush() throws IOException { + m_writeLock.lock(); + + try { + m_writeDataFile.flush(); + } finally { + m_writeLock.lock(); + } + } + + @Override + public Collection getIdsByPrefix(String tag) { + throw new UnsupportedOperationException("Not supported by local logview bucket!"); + } + + @Override + public void initialize(Class type, String domain, Date timestamp) throws IOException { + m_writeLock = new ReentrantLock(); + + String logicalPath = m_pathBuilder.getMessagePath(domain, timestamp); + File dataFile = new File(m_baseDir, logicalPath); + + dataFile.getParentFile().mkdirs(); + m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile), 8192); + } + + public void setBaseDir(String baseDir) { + m_baseDir = baseDir; + } + + @Override + public boolean storeById(String id, MessageTree tree) throws IOException { + ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192); + + m_codec.encode(tree, buf); + + int length = buf.readInt(); + byte[] num = String.valueOf(length).getBytes("utf-8"); + + m_writeLock.lock(); + + try { + m_writeDataFile.write(num); + m_writeDataFile.write('\n'); + m_writeDataFile.write(buf.array(), buf.readerIndex(), length); + m_writeDataFile.write('\n'); + + return true; + } finally { + m_writeLock.unlock(); + } + } +} diff --git a/cat-core/src/main/resources/META-INF/plexus/components.xml b/cat-core/src/main/resources/META-INF/plexus/components.xml index 457f331d414be8c581ef6481f585fcc66776a068..635f190bfed7b50c8da041006676e9f5d40e0ce9 100644 --- a/cat-core/src/main/resources/META-INF/plexus/components.xml +++ b/cat-core/src/main/resources/META-INF/plexus/components.xml @@ -285,5 +285,20 @@ + + com.dianping.cat.storage.Bucket + com.dianping.cat.message.spi.MessageTree-message + com.dianping.cat.storage.message.LocalMessageBucket + per-lookup + + + com.dianping.cat.message.spi.MessagePathBuilder + + + com.dianping.cat.message.spi.MessageCodec + plain-text + + +