提交 ac6d5d65 编写于 作者: S sean.wang

more cked test case

上级 d58e2fed
......@@ -21,24 +21,28 @@ class StorageComponentConfigurator extends AbstractResourceConfigurator {
List<Component> all = new ArrayList<Component>();
all.add(C(Bucket.class, String.class.getName() + "-local", LocalStringBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
all.add(C(Bucket.class, MessageTree.class.getName() + "-local", LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(BucketManager.class, DefaultBucketManager.class) //
.req(MessagePathBuilder.class));
.req(MessagePathBuilder.class));
all.add(C(Bucket.class, MessageTree.class.getName() + "-logview", LocalLogviewBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, MessageTree.class.getName() + "-message",
com.dianping.cat.storage.message.LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, MessageTree.class.getName() + "-message",//
com.dianping.cat.storage.message.LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(Bucket.class, String.class.getName() + "-report",//
com.dianping.cat.storage.report.LocalReportBucket.class) //
.is(PER_LOOKUP) //
.req(MessagePathBuilder.class));
return all;
}
......
......@@ -300,5 +300,16 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.Bucket</role>
<role-hint>java.lang.String-report</role-hint>
<implementation>com.dianping.cat.storage.report.LocalReportBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
</requirements>
</component>
</components>
</plexus>
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.ComponentTestCase;
public abstract class MesageTreeBucketTestCase extends ComponentTestCase {
protected final static int threadNum = 10;// notice: max 9, for creating asc order id bellow
protected final static int timesPerThread = 1000; // notice: must be powers 10, fro creating asc order id bellow
protected ExecutorService pool = null;
Bucket<MessageTree> bucket = null;
protected void printFails(final int fails, final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " fails:" + fails + " waste:" + (System.currentTimeMillis() - start) + "ms");
if (fails > 0) {
Assert.fail("fails:" + fails);
}
}
protected void print(final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " waste:" + (System.currentTimeMillis() - start) + "ms");
}
protected void resetSerial(final AtomicInteger serial) {
serial.set(10 * timesPerThread);
}
protected AtomicInteger createSerial() {
return new AtomicInteger(10 * timesPerThread);
}
protected void submit(Runnable run) {
for (int p = 0; p < threadNum; p++) {
pool.submit(run);
}
}
protected CountDownLatch createLatch() {
return new CountDownLatch(threadNum);
}
@Before
public void setUp() throws IOException {
try {
super.setUp();
} catch (Exception e1) {
e1.printStackTrace();
}
try {
pool = Executors.newFixedThreadPool(threadNum);
bucket = createBucket();
} catch (Exception e) {
e.printStackTrace();
}
}
protected abstract Bucket<MessageTree> createBucket() throws Exception;
@After
public void tearDown() throws Exception {
super.tearDown();
bucket.close();
bucket.deleteAndCreate();
}
@Test
public void testReload() throws Exception {
final AtomicInteger serial = createSerial();
this.serialWrite(serial);
resetSerial(serial);
this.bucket.close();
long start = System.currentTimeMillis();
bucket = createBucket();
print(start);
serialRead(serial);
}
@Test
public void testConcurrentRead() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentReadWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
Assert.assertTrue(bucket.storeById(id, mt));
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
try {
String id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
boolean success = bucket.storeById(id, mt);
if (!success) {
fail.incrementAndGet();
}
} catch (Throwable e) {
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
resetSerial(serial);
this.serialRead(serial);
}
@Test
public void testSerialRead() throws Exception {
final AtomicInteger serial = createSerial();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
serialRead(serial);
print(start);
}
@Test
public void testSerialWrite() throws Exception {
final AtomicInteger serial = createSerial();
long start = System.currentTimeMillis();
this.serialWrite(serial);
print(start);
resetSerial(serial);
this.serialRead(serial);
}
private void serialRead(final AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
}
}
}
private void serialWrite(AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
Assert.assertTrue(bucket.storeById(id, mt));
}
}
}
}
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.site.lookup.ComponentTestCase;
public abstract class StringBucketTestCase extends ComponentTestCase {
protected final static int threadNum = 10;// notice: max 9, for creating asc order id bellow
protected final static int timesPerThread = 1000; // notice: must be powers 10, fro creating asc order id bellow
protected ExecutorService pool = null;
Bucket<String> bucket = null;
protected void printFails(final int fails, final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " fails:" + fails + " waste:" + (System.currentTimeMillis() - start) + "ms");
if (fails > 0) {
Assert.fail("fails:" + fails);
}
}
protected void print(final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " waste:" + (System.currentTimeMillis() - start) + "ms");
}
protected void resetSerial(final AtomicInteger serial) {
serial.set(10 * timesPerThread);
}
protected AtomicInteger createSerial() {
return new AtomicInteger(10 * timesPerThread);
}
protected void submit(Runnable run) {
for (int p = 0; p < threadNum; p++) {
pool.submit(run);
}
}
protected CountDownLatch createLatch() {
return new CountDownLatch(threadNum);
}
@Before
public void setUp() throws IOException {
try {
super.setUp();
} catch (Exception e1) {
e1.printStackTrace();
}
try {
pool = Executors.newFixedThreadPool(threadNum);
bucket = createBucket();
} catch (Exception e) {
e.printStackTrace();
}
}
protected abstract Bucket<String> createBucket() throws Exception;
@After
public void tearDown() throws Exception {
super.tearDown();
bucket.close();
bucket.deleteAndCreate();
}
@Test
public void testConcurrentRead() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
String value = "value:" + id;
String target = bucket.findById(id);
Assert.assertEquals(value, target);
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentReadWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
String value = "value:" + id;
Assert.assertTrue(bucket.storeById(id, value));
String target = bucket.findById(id);
Assert.assertEquals(value, target);
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
try {
String id = "" + serial.incrementAndGet();
String value = "value:" + id;
boolean success = bucket.storeById(id, value);
if (!success) {
fail.incrementAndGet();
}
} catch (Throwable e) {
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
resetSerial(serial);
this.serialRead(serial);
}
@Test
public void testSerialRead() throws Exception {
final AtomicInteger serial = createSerial();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
serialRead(serial);
print(start);
}
@Test
public void testReload() throws Exception {
final AtomicInteger serial = createSerial();
this.serialWrite(serial);
resetSerial(serial);
this.bucket.close();
long start = System.currentTimeMillis();
bucket = createBucket();
print(start);
serialRead(serial);
}
@Test
public void testSerialWrite() throws Exception {
final AtomicInteger serial = createSerial();
long start = System.currentTimeMillis();
this.serialWrite(serial);
print(start);
resetSerial(serial);
this.serialRead(serial);
}
private void serialRead(final AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
String target = bucket.findById(id);
Assert.assertEquals("value:" + id, target);
}
}
}
private void serialWrite(AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
String value = "value:" + id;
Assert.assertTrue(bucket.storeById(id, value));
}
}
}
}
......@@ -2,217 +2,17 @@ package com.dianping.cat.storage.message;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.Bucket;
import com.site.lookup.ComponentTestCase;
public class LocalLogviewBucketTest extends ComponentTestCase {
protected final static int threadNum = 10;// notice: max 9, for creating asc order id bellow
protected final static int timesPerThread = 1000; // notice: must be powers 10, fro creating asc order id bellow
protected void printFails(final int fails, final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " fails:" + fails + " waste:" + (System.currentTimeMillis() - start) + "ms");
if (fails > 0) {
Assert.fail("fails:" + fails);
}
}
protected void print(final long start) {
System.out.println(new Throwable().getStackTrace()[1].toString() + " threads:" + threadNum + " total:" + threadNum * timesPerThread + " waste:" + (System.currentTimeMillis() - start) + "ms");
}
protected void resetSerial(final AtomicInteger serial) {
serial.set(10 * timesPerThread);
}
protected AtomicInteger createSerial() {
return new AtomicInteger(10 * timesPerThread);
}
final ExecutorService pool = Executors.newFixedThreadPool(threadNum);
protected void submit(Runnable run) {
for (int p = 0; p < threadNum; p++) {
pool.submit(run);
}
}
protected CountDownLatch createLatch() {
return new CountDownLatch(threadNum);
}
import com.dianping.cat.storage.MesageTreeBucketTestCase;
Bucket<MessageTree> bucket = null;
public class LocalLogviewBucketTest extends MesageTreeBucketTestCase {
@SuppressWarnings("unchecked")
@Before
public void setUp() throws IOException {
try {
super.setUp();
} catch (Exception e1) {
e1.printStackTrace();
}
try {
bucket = lookup(Bucket.class, MessageTree.class.getName() + "-logview");
bucket.initialize(null, "cat", new Date());
} catch (Exception e) {
e.printStackTrace();
}
protected Bucket<MessageTree> createBucket() throws Exception, IOException {
Bucket<MessageTree> bucket = lookup(Bucket.class, MessageTree.class.getName() + "-logview");
bucket.initialize(null, "cat", new Date());
return bucket;
}
@After
public void tearDown() throws Exception {
super.tearDown();
bucket.close();
bucket.deleteAndCreate();
}
@Test
public void testConcurrentRead() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentReadWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
String id = null;
String expect = null;
try {
id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
Assert.assertTrue(bucket.storeById(id, mt));
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
} catch (Throwable e) {
System.out.println(Thread.currentThread().getName() + ":" + id + ":" + expect);
e.printStackTrace();
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
}
@Test
public void testConcurrentWrite() throws Exception {
final AtomicInteger serial = createSerial();
final AtomicInteger fail = new AtomicInteger();
final CountDownLatch latch = createLatch();
long start = System.currentTimeMillis();
submit(new Runnable() {
public void run() {
for (int i = 0; i < timesPerThread; i++) {
try {
String id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
boolean success = bucket.storeById(id, mt);
if (!success) {
fail.incrementAndGet();
}
} catch (Throwable e) {
fail.incrementAndGet();
}
}
latch.countDown();
}
});
latch.await();
printFails(fail.get(), start);
resetSerial(serial);
this.serialRead(serial);
}
@Test
public void testSerialRead() throws Exception {
final AtomicInteger serial = createSerial();
this.serialWrite(serial);
resetSerial(serial);
long start = System.currentTimeMillis();
serialRead(serial);
print(start);
}
@Test
public void testSerialWrite() throws Exception {
final AtomicInteger serial = createSerial();
long start = System.currentTimeMillis();
this.serialWrite(serial);
print(start);
resetSerial(serial);
this.serialRead(serial);
}
private void serialRead(final AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
MessageTree target = bucket.findById(id);
Assert.assertEquals(id, target.getMessageId());
}
}
}
private void serialWrite(AtomicInteger serial) throws IOException {
for (int p = 0; p < threadNum; p++) {
for (int i = 0; i < timesPerThread; i++) {
String id = "" + serial.incrementAndGet();
DefaultMessageTree mt = new DefaultMessageTree();
mt.setMessageId(id);
Assert.assertTrue(bucket.storeById(id, mt));
}
}
}
}
package com.dianping.cat.storage.report;
import java.io.IOException;
import java.util.Date;
import org.junit.Ignore;
import org.junit.Test;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.StringBucketTestCase;
public class LocalReportBucketTest extends StringBucketTestCase {
@SuppressWarnings("unchecked")
protected Bucket<String> createBucket() throws Exception, IOException {
Bucket<String> bucket = lookup(Bucket.class, String.class.getName() + "-report");
bucket.initialize(null, "cat", new Date());
return bucket;
}
@Test
@Ignore
public void testReload() throws Exception {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册