提交 0e2798c8 编写于 作者: F Frankie Wu

Merge branch 'master' of ssh://192.168.8.22:58422/cat

...@@ -63,6 +63,8 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -63,6 +63,8 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
private Logger m_logger; private Logger m_logger;
private String m_logicalPath;
@Override @Override
public void close() throws IOException { public void close() throws IOException {
m_writeLock.lock(); m_writeLock.lock();
...@@ -81,7 +83,8 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -81,7 +83,8 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
@Override @Override
public void deleteAndCreate() throws IOException { public void deleteAndCreate() throws IOException {
throw new UnsupportedOperationException("Not supported by local logview bucket!"); new File(m_baseDir, m_logicalPath).delete();
new File(m_baseDir, m_logicalPath + ".idx").delete();
} }
@Override @Override
...@@ -101,13 +104,12 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -101,13 +104,12 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
if (offset != null) { if (offset != null) {
m_readLock.lock(); m_readLock.lock();
if (m_dirty.get()) {
flush(); // flush first if any read requesting
}
try { try {
if (m_dirty.get()) {
flush(); // flush first if any read requesting
}
m_readDataFile.seek(offset); m_readDataFile.seek(offset);
m_readDataFile.readLine(); // first line is header, get rid of it
int num = Integer.parseInt(m_readDataFile.readLine()); int num = Integer.parseInt(m_readDataFile.readLine());
byte[] bytes = new byte[num]; byte[] bytes = new byte[num];
...@@ -175,7 +177,7 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -175,7 +177,7 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
m_writeIndexFile.flush(); m_writeIndexFile.flush();
} finally { } finally {
m_dirty.set(false); m_dirty.set(false);
m_writeLock.lock(); m_writeLock.unlock();
} }
} }
...@@ -196,9 +198,10 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -196,9 +198,10 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
dataFile.getParentFile().mkdirs(); dataFile.getParentFile().mkdirs();
m_logicalPath = logicalPath;
m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192);
m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192);
m_readDataFile = new RandomAccessFile(dataFile, "r"); m_readDataFile = new RandomAccessFile(dataFile, "r");
m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile), 8192);
m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile), 8192);
if (indexFile.exists()) { if (indexFile.exists()) {
loadIndexes(indexFile); loadIndexes(indexFile);
...@@ -256,6 +259,8 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -256,6 +259,8 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
@Override @Override
public boolean storeById(String id, MessageTree tree) throws IOException { public boolean storeById(String id, MessageTree tree) throws IOException {
m_writeLock.lock();
if (m_idToOffsets.containsKey(id)) { if (m_idToOffsets.containsKey(id)) {
return false; return false;
} }
...@@ -268,20 +273,18 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -268,20 +273,18 @@ public class LocalLogviewBucket implements Bucket<MessageTree>, LogEnabled {
int length = buf.readInt(); int length = buf.readInt();
byte[] num = String.valueOf(length).getBytes("utf-8"); byte[] num = String.valueOf(length).getBytes("utf-8");
long offset = m_writeDataFileLength;
m_writeLock.lock();
try { try {
m_writeDataFile.write(num); m_writeDataFile.write(num);
m_writeDataFile.write('\n'); m_writeDataFile.write('\n');
m_writeDataFile.write(buf.array(), buf.readerIndex(), length); m_writeDataFile.write(buf.array(), buf.readerIndex(), length);
m_writeDataFile.write('\n'); m_writeDataFile.write('\n');
m_writeDataFileLength += num.length + 1 + length + 1;
long offset = m_writeDataFileLength;
String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n'; String line = id + '\t' + offset + '\t' + Joiners.by('\t').join(tags) + '\n';
byte[] data = line.getBytes("utf-8"); byte[] data = line.getBytes("utf-8");
m_writeDataFileLength += num.length + 1 + length + 1;
m_writeIndexFile.write(data); m_writeIndexFile.write(data);
m_dirty.set(true); m_dirty.set(true);
......
package com.dianping.cat.storage.message;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.lookup.ComponentTestCase;
public class LocalLogviewBucketTest extends ComponentTestCase {
protected final static int threadNum = 10;// notice: max 9, for creating asc order id bellow
protected final static int timesPerThread = 1000; // notice: must be powers 10, fro creating asc order id bellow
protected void printFails(final int fails, final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " fails:" + fails + " waste:" + (System.currentTimeMillis() - start) + "ms");
if (fails > 0) {
Assert.fail("fails:" + fails);
}
}
protected void print(final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " waste:" + (System.currentTimeMillis() - start) + "ms");
}
protected void resetSerial(final AtomicInteger serial) {
serial.set(10 * timesPerThread);
}
protected AtomicInteger createSerial() {
return new AtomicInteger(10 * timesPerThread);
}
final ExecutorService pool = Executors.newFixedThreadPool(threadNum);
protected void submit(Runnable run) {
for (int p = 0; p < threadNum; p++) {
pool.submit(run);
}
}
protected CountDownLatch createLatch() {
return new CountDownLatch(threadNum);
}
Bucket<MessageTree> bucket = null;
@SuppressWarnings("unchecked")
@Before
public void setUp() throws IOException {
try {
super.setUp();
} catch (Exception e1) {
e1.printStackTrace();
}
try {
bucket = lookup(Bucket.class, MessageTree.class.getName() + "-logview");
bucket.initialize(null, "cat", new Date());
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void tearDown() throws Exception {
super.tearDown();
bucket.close();
bucket.deleteAndCreate();
}
@Test
public void testConcurrentRead() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentReadWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
Assert.assertTrue(bucket.storeById(id, mt));
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
try {
String id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
boolean success = bucket.storeById(id, mt);
if (!success) {
fail.incrementAndGet();
}
} catch (Throwable e) {
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
resetSerial(serial);
this.serialRead(serial);
}
@Test
public void testSerialRead() throws Exception {
final AtomicInteger serial = createSerial();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
serialRead(serial);
print(start);
}
@Test
public void testSerialWrite() throws Exception {
final AtomicInteger serial = createSerial();
long start = System.currentTimeMillis();
this.serialWrite(serial);
print(start);
resetSerial(serial);
this.serialRead(serial);
}
private void serialRead(final AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
}
}
}
private void serialWrite(AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
Assert.assertTrue(bucket.storeById(id, mt));
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册