提交 d6a7aa79 编写于 作者: F Frankie Wu

local message bucket

上级 fa4c70d9
......@@ -34,6 +34,11 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator {
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, MessageTree.class.getName() + "-message",
com.dianping.cat.storage.message.LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
return all;
}
......
......@@ -22,10 +22,6 @@ import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
......@@ -266,47 +262,35 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
List<String> tags = prepareTags(tree);
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("Bucket", getClass().getSimpleName());
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
try {
m_codec.encode(tree, buf);
m_codec.encode(tree, buf);
int length = buf.readInt();
byte[] num = String.valueOf(length).getBytes("utf-8");
long offset = m_writeDataFileLength;
int length = buf.readInt();
byte[] num = String.valueOf(length).getBytes("utf-8");
long offset = m_writeDataFileLength;
m_writeLock.lock();
m_writeLock.lock();
try {
m_writeDataFile.write(num);
m_writeDataFile.write('\n');
m_writeDataFile.write(buf.array(), buf.readerIndex(), length);
m_writeDataFile.write('\n');
m_writeDataFileLength += num.length + 1 + length + 1;
try {
m_writeDataFile.write(num);
m_writeDataFile.write('\n');
m_writeDataFile.write(buf.array(), buf.readerIndex(), length);
m_writeDataFile.write('\n');
m_writeDataFileLength += num.length + 1 + length + 1;
String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n';
byte[] data = line.getBytes("utf-8");
String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n';
byte[] data = line.getBytes("utf-8");
m_writeIndexFile.write(data);
m_dirty.set(true);
m_writeIndexFile.write(data);
m_dirty.set(true);
updateIndex(id, offset, tags);
updateIndex(id, offset, tags);
t.setStatus(Message.SUCCESS);
return true;
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
} finally {
m_writeLock.unlock();
}
return true;
} finally {
t.complete();
m_writeLock.unlock();
}
return false;
}
protected void updateIndex(String id, long offset, List<String> tags) {
......
package com.dianping.cat.storage.message;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.lookup.annotation.Inject;
public class LocalMessageBucket implements Bucket<MessageTree> {
@Inject
private MessageCodec m_codec;
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private String m_baseDir = "target/bucket";
private ReentrantLock m_writeLock;
private OutputStream m_writeDataFile;
@Override
public void close() throws IOException {
m_writeLock.lock();
try {
m_writeDataFile.close();
} catch (Exception e) {
// ignore it
} finally {
m_writeLock.unlock();
}
}
@Override
public void deleteAndCreate() throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
}
@Override
public List<String> findAllById(String id) throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
}
@Override
public MessageTree findById(String id) throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
}
@Override
public MessageTree findNextById(String id, String tag) throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
}
@Override
public MessageTree findPreviousById(String id, String tag) throws IOException {
throw new UnsupportedOperationException("Not supported by local message bucket!");
}
@Override
public void flush() throws IOException {
m_writeLock.lock();
try {
m_writeDataFile.flush();
} finally {
m_writeLock.lock();
}
}
@Override
public Collection<String> getIdsByPrefix(String tag) {
throw new UnsupportedOperationException("Not supported by local logview bucket!");
}
@Override
public void initialize(Class<?> type, String domain, Date timestamp) throws IOException {
m_writeLock = new ReentrantLock();
String logicalPath = m_pathBuilder.getMessagePath(domain, timestamp);
File dataFile = new File(m_baseDir, logicalPath);
dataFile.getParentFile().mkdirs();
m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile), 8192);
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
@Override
public boolean storeById(String id, MessageTree tree) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
m_codec.encode(tree, buf);
int length = buf.readInt();
byte[] num = String.valueOf(length).getBytes("utf-8");
m_writeLock.lock();
try {
m_writeDataFile.write(num);
m_writeDataFile.write('\n');
m_writeDataFile.write(buf.array(), buf.readerIndex(), length);
m_writeDataFile.write('\n');
return true;
} finally {
m_writeLock.unlock();
}
}
}
......@@ -285,5 +285,20 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>com.dianping.cat.message.spi.MessageTree-message</role-hint>
<implementation>com.dianping.cat.storage.message.LocalMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册