提交 dd622f76 编写于 作者: F Frankie Wu

make Bucket impl thread-safe

上级 8daf9f2d
......@@ -8,6 +8,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
......@@ -20,7 +23,7 @@ import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.annotation.Inject;
public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T>, LogEnabled {
public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSupport<T>, LogEnabled {
private static final String[] EMPTY = new String[0];
@Inject
......@@ -38,14 +41,22 @@ public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T
private Logger m_logger;
private ReadLock m_readLock;
private WriteLock m_writeLock;
@Override
public void close() {
m_writeLock.lock();
try {
m_out.close();
m_idToOffsets.clear();
m_tagToIds.clear();
} catch (IOException e) {
// ignore it
} finally {
m_writeLock.unlock();
}
}
......@@ -85,6 +96,8 @@ public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T
Long offset = m_idToOffsets.get(id);
if (offset != null) {
m_readLock.lock();
try {
long old = m_out.getFilePointer();
......@@ -104,6 +117,8 @@ public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T
return data;
} catch (Exception e) {
m_logger.error(String.format("Error when reading file(%s)!", m_file), e);
} finally {
m_readLock.unlock();
}
}
......@@ -138,49 +153,58 @@ public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T
@Override
public void initialize(Class<?> type, String path) throws IOException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
m_readLock = lock.readLock();
m_writeLock = lock.writeLock();
m_file = new File(m_baseDir, path);
m_file.getParentFile().mkdirs();
m_out = new RandomAccessFile(m_file, "rw");
if (m_file.exists()) {
loadIndexes();
m_out.seek(m_out.length());
}
}
protected void loadIndexes() throws IOException {
byte[] data = new byte[8192];
while (true) {
long offset = m_out.getFilePointer();
String first = m_out.readLine();
m_writeLock.lock();
if (first == null) { // EOF
break;
}
try {
while (true) {
long offset = m_out.getFilePointer();
String first = m_out.readLine();
int num = Integer.parseInt(m_out.readLine());
if (first == null) { // EOF
break;
}
if (num > data.length) {
int newSize = data.length;
int num = Integer.parseInt(m_out.readLine());
while (newSize < num) {
newSize += newSize / 2;
}
if (num > data.length) {
int newSize = data.length;
data = new byte[newSize];
}
while (newSize < num) {
newSize += newSize / 2;
}
data = new byte[newSize];
}
m_out.readFully(data, 0, num); // get rid of it
m_out.readLine(); // get rid of empty line
m_out.readFully(data, 0, num); // get rid of it
m_out.readLine(); // get rid of empty line
List<String> parts = Splitters.by('\t').split(first);
if (parts.size() > 0) {
String id = parts.get(0);
List<String> parts = Splitters.by('\t').split(first);
if (parts.size() > 0) {
String id = parts.get(0);
parts.remove(0);
updateIndex(id, parts.toArray(EMPTY), offset);
parts.remove(0);
updateIndex(id, parts.toArray(EMPTY), offset);
}
}
} finally {
m_writeLock.unlock();
}
}
......@@ -200,16 +224,27 @@ public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T
*/
@Override
public boolean storeById(String id, T data, String... tags) {
try {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n";
byte[] first;
byte[] num;
int length;
try {
encode(data, buf);
int length = buf.readInt();
String attributes = id + "\t" + Joiners.by('\t').join(tags) + "\n";
length = buf.readInt();
first = attributes.getBytes("utf-8");
num = String.valueOf(length).getBytes("utf-8");
} catch (Exception e) {
m_logger.error(String.format("Error when preparing to write to file(%s)!", m_file), e);
return false;
}
m_writeLock.lock();
byte[] first = attributes.getBytes("utf-8");
byte[] num = String.valueOf(length).getBytes("utf-8");
try {
long offset = m_out.getFilePointer();
m_out.write(first);
......@@ -226,6 +261,8 @@ public abstract class AbstractBucket<T> implements Bucket<T>, TagThreadSupport<T
m_logger.error(String.format("Error when writing to file(%s)!", m_file), e);
return false;
} finally {
m_writeLock.unlock();
}
}
......
......@@ -5,7 +5,7 @@ import java.nio.charset.Charset;
import org.jboss.netty.buffer.ChannelBuffer;
public class DefaultBucket<T> extends AbstractBucket<T> {
public class DefaultBucket<T> extends AbstractFileBucket<T> {
private Class<?> m_type;
@SuppressWarnings("unchecked")
......
......@@ -10,7 +10,7 @@ import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.MessageBucket;
import com.site.lookup.annotation.Inject;
public class DefaultMessageBucket extends AbstractBucket<MessageTree> implements MessageBucket {
public class DefaultMessageBucket extends AbstractFileBucket<MessageTree> implements MessageBucket {
@Inject
private MessageCodec m_codec;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册