提交 e69d652c 编写于 作者: Y You Yong

modify the test case and delete the in memory queue

上级 c671cf09
......@@ -13,9 +13,6 @@ import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.io.DefaultTransportManager;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.InMemoryReceiver;
import com.dianping.cat.message.io.InMemorySender;
import com.dianping.cat.message.io.MessageReceiver;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.io.TcpSocketHierarchySender;
......@@ -57,12 +54,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(ClientConfigManager.class));
all.add(C(ServerConfigManager.class));
all.add(C(InMemoryQueue.class));
all.add(C(MessageSender.class, "in-memory", InMemorySender.class) //
.req(InMemoryQueue.class));
all.add(C(MessageReceiver.class, "in-memory", InMemoryReceiver.class) //
.req(InMemoryQueue.class));
all.add(C(MessageManager.class, DefaultMessageManager.class) //
.req(ClientConfigManager.class, TransportManager.class, MessageStatistics.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class) //
......
......@@ -35,7 +35,7 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
private MessageIdFactory m_factory;
private long m_throttleTimes = 9999;
private long m_throttleTimes = 0;
// we don't use static modifier since MessageManager is a singleton in
// production actually
......@@ -90,8 +90,8 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
} else {
m_throttleTimes++;
if (m_throttleTimes % 10000 == 0) {
if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
}
}
......@@ -189,10 +189,11 @@ public class DefaultMessageManager extends ContainerHolder implements MessageMan
}
return false;
//if (tree.getMessage() != null && "Heartbeat".equals(tree.getMessage().getName())) {
// return false;
//}
// if (tree.getMessage() != null &&
// "Heartbeat".equals(tree.getMessage().getName())) {
// return false;
// }
// int threadCount = ManagementFactory.getThreadMXBean().getThreadCount();
// return threadCount > m_domain.getMaxThreads();
}
......
......@@ -24,10 +24,6 @@ public class DefaultTransportManager extends ContainerHolder implements Transpor
@Override
public MessageSender getSender() {
if (m_sender == null) {
throw new RuntimeException("Server mode only, no sender is provided!");
}
return m_sender;
}
......@@ -36,12 +32,12 @@ public class DefaultTransportManager extends ContainerHolder implements Transpor
List<Server> servers = m_configManager.getServers();
if (!m_configManager.isCatEnabled()) {
m_sender = lookup(MessageSender.class, "in-memory");
m_sender = null;
if (m_configManager.isInitialized()) {
m_logger.warn("CAT was DISABLED explicitly, an in-memory sender used instead!");
m_logger.warn("CAT was DISABLED explicitly!");
} else {
m_logger.warn("CAT was DISABLED due to not initialized yet, an in-memory sender used instead!");
m_logger.warn("CAT was DISABLED due to not initialized yet!");
}
} else {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
......
package com.dianping.cat.message.io;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessageTree;
public class InMemoryQueue implements LogEnabled, Initializable {
private BlockingQueue<MessageTree> m_queue;
private int m_queueSize;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
if (m_queueSize <= 0) {
m_queue = new LinkedBlockingQueue<MessageTree>(1000);
} else {
m_queue = new LinkedBlockingQueue<MessageTree>(m_queueSize);
}
}
public void offer(MessageTree tree) {
while (!m_queue.offer(tree)) {
// throw away the tree at the tail
MessageTree m = m_queue.poll();
if (m == null) {
break;
} else {
m_logger.warn(tree + " was thrown away due to queue is full!");
}
}
}
public MessageTree peek() {
return m_queue.peek();
}
public MessageTree poll(long timeout) throws InterruptedException {
if (timeout <= 0) {
return m_queue.poll();
} else {
return m_queue.poll(timeout, TimeUnit.MILLISECONDS);
}
}
public void setQueueSize(int queueSize) {
m_queueSize = queueSize;
}
public int size() {
return m_queue.size();
}
public void clear() {
m_queue.clear();
}
}
package com.dianping.cat.message.io;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class InMemoryReceiver implements MessageReceiver {
@Inject
private InMemoryQueue m_queue;
private transient boolean m_active = true;
@Override
public void initialize() {
}
public boolean isActive() {
synchronized (this) {
return m_active;
}
}
@Override
public void onMessage(MessageHandler handler) {
try {
while (true) {
MessageTree tree = m_queue.poll(1);
if (tree != null) {
handler.handle(tree);
} else if (!isActive()) {
break;
}
}
} catch (InterruptedException e) {
// ignore it
}
}
public void setQueue(InMemoryQueue queue) {
m_queue = queue;
}
@Override
public void shutdown() {
synchronized (this) {
m_active = false;
}
}
}
package com.dianping.cat.message.io;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class InMemorySender implements MessageSender {
@Inject
private InMemoryQueue m_queue;
private transient boolean m_active = true;
public InMemoryQueue getQueue() {
return m_queue;
}
@Override
public void initialize() {
}
public boolean isActive() {
synchronized (this) {
return m_active;
}
}
@Override
public void send(MessageTree tree) {
if (isActive()) {
m_queue.offer(tree);
}
}
public void setQueue(InMemoryQueue queue) {
m_queue = queue;
}
@Override
public void shutdown() {
synchronized (this) {
m_active = false;
}
}
}
......@@ -49,9 +49,9 @@ public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled
private transient boolean m_active;
private AtomicInteger m_errors = new AtomicInteger(999);
private AtomicInteger m_errors = new AtomicInteger();
private AtomicInteger m_attempts = new AtomicInteger(999);
private AtomicInteger m_attempts = new AtomicInteger();
boolean checkWritable(ChannelFuture future) {
boolean isWriteable = false;
......@@ -62,7 +62,7 @@ public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled
} else {
int count = m_attempts.incrementAndGet();
if (count % 1000 == 0) {
if (count % 1000 == 0 || count == 1) {
m_logger.error("Netty write buffer is full! Attempts: " + count);
}
}
......@@ -131,7 +131,7 @@ public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled
int count = m_errors.incrementAndGet();
if (count % 1000 == 0) {
if (count % 1000 == 0 || count == 1) {
m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
}
}
......@@ -228,7 +228,7 @@ public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled
if (!future.isSuccess()) {
future.getChannel().getCloseFuture().awaitUninterruptibly();
int count = m_reconnects.incrementAndGet();
if (count % 1000 == 0) {
m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause());
}
......
......@@ -54,11 +54,11 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
private transient boolean m_active;
private AtomicInteger m_errors = new AtomicInteger(999);
private AtomicInteger m_errors = new AtomicInteger();
private AtomicInteger m_attempts = new AtomicInteger(999);
private AtomicInteger m_attempts = new AtomicInteger();
private AtomicInteger m_reconnects = new AtomicInteger(999);
private AtomicInteger m_reconnects = new AtomicInteger();
@Override
public void enableLogging(Logger logger) {
......@@ -117,8 +117,8 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
if (!future.isSuccess()) {
int count = m_reconnects.incrementAndGet();
if (count % 1000 == 0) {
if (count % 1000 == 0 || count == 1) {
m_logger.error("Error when reconnecting to " + m_serverAddress, future.getCause());
}
} else {
......@@ -175,7 +175,7 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
} else {
int count = m_attempts.incrementAndGet();
if (count % 1000 == 0) {
if (count % 1000 == 0 || count == 1) {
m_logger.error("Netty write buffer is full! Attempts: " + count);
}
}
......@@ -195,7 +195,7 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
int count = m_errors.incrementAndGet();
if (count % 1000 == 0) {
if (count % 1000 == 0 || count == 1) {
m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
}
}
......
......@@ -20,6 +20,7 @@ public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder impleme
process(tree);
} catch (Throwable e) {
m_errors++;
if (m_errors % 10000 == 0) {
e.printStackTrace();
}
......@@ -35,6 +36,7 @@ public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder impleme
process(tree);
} catch (Throwable e) {
m_errors++;
if (m_errors % 10000 == 0) {
e.printStackTrace();
}
......
......@@ -8,30 +8,6 @@
<role>com.dianping.cat.configuration.ServerConfigManager</role>
<implementation>com.dianping.cat.configuration.ServerConfigManager</implementation>
</component>
<component>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
<implementation>com.dianping.cat.message.io.InMemoryQueue</implementation>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>in-memory</role-hint>
<implementation>com.dianping.cat.message.io.InMemorySender</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.io.MessageReceiver</role>
<role-hint>in-memory</role-hint>
<implementation>com.dianping.cat.message.io.InMemoryReceiver</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.io.InMemoryQueue</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageManager</role>
<implementation>com.dianping.cat.message.internal.DefaultMessageManager</implementation>
......
......@@ -11,7 +11,6 @@ import com.dianping.cat.message.TransactionTest;
import com.dianping.cat.message.configuration.ClientConfigTest;
import com.dianping.cat.message.internal.MessageIdFactoryTest;
import com.dianping.cat.message.internal.MillisSecondTimerTest;
import com.dianping.cat.message.io.InMemoryTest;
import com.dianping.cat.message.io.TcpSocketTest;
import com.dianping.cat.message.spi.codec.HtmlMessageCodecTest;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodecTest;
......@@ -48,8 +47,6 @@ MillisSecondTimerTest.class,
DefaultMessagePathBuilderTest.class,
/* .io */
InMemoryTest.class,
TcpSocketTest.class,
/* .spi.codec */
......
......@@ -43,7 +43,7 @@ public class CatTest {
}
@Test
public void testWithoutGlobalConfigInitialize() throws InterruptedException {
public void testWithNoExistGlobalConfigInitialize() throws InterruptedException {
Cat.initialize(new File("/data/appdatas/cat/clientNoExist.xml"));
MessageProducer cat = Cat.getProducer();
Transaction t = cat.newTransaction("TestType", "TestName");
......
......@@ -13,6 +13,7 @@ import com.dianping.cat.message.Transaction;
public class CatAppenderTest {
@Test
public void testWithTransaction() throws InterruptedException {
Cat.destroy();
CatAppender appender = new CatAppender();
Throwable throwable = new Exception();
Category logger = Logger.getLogger(CatAppenderTest.class);
......@@ -29,6 +30,7 @@ public class CatAppenderTest {
@Test
public void testWithoutTransaction() throws InterruptedException {
Cat.destroy();
CatAppender appender = new CatAppender();
Throwable throwable = new Exception();
Category logger = Logger.getLogger(CatAppenderTest.class);
......
......@@ -7,7 +7,7 @@ import org.junit.runners.JUnit4;
import com.dianping.cat.Cat;
@RunWith(JUnit4.class)
public class EventTest extends CatTestCase {
public class EventTest{
@Test
public void testNormal() {
Event event = Cat.getProducer().newEvent("Review", "New");
......
......@@ -7,7 +7,7 @@ import org.junit.runners.JUnit4;
import com.dianping.cat.Cat;
@RunWith(JUnit4.class)
public class HeartbeatTest extends CatTestCase {
public class HeartbeatTest{
@Test
public void testInOneShot() {
Cat.getProducer().logHeartbeat("System", "Status", "0",
......
......@@ -7,7 +7,7 @@ import org.junit.runners.JUnit4;
import com.dianping.cat.Cat;
@RunWith(JUnit4.class)
public class TransactionTest extends CatTestCase {
public class TransactionTest{
@Test
public void testNormal() {
Transaction t = Cat.getProducer().newTransaction("URL", "MyPage");
......
package com.dianping.cat.message.io;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import junit.framework.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class InMemoryTest extends ComponentTestCase {
@Test
public void test() throws Exception {
final MessageSender sender = lookup(MessageSender.class, "in-memory");
final MessageReceiver receiver = lookup(MessageReceiver.class, "in-memory");
final int len = 1000;
final StringBuilder sb = new StringBuilder(len);
ExecutorService pool = Executors.newFixedThreadPool(3);
List<Future<?>> futures = new ArrayList<Future<?>>();
futures.add(pool.submit(new Runnable() {
@Override
public void run() {
receiver.initialize();
receiver.onMessage(new MockMessageHandler(sb));
}
}));
futures.add(pool.submit(new Runnable() {
@Override
public void run() {
sender.initialize();
for (int i = 0; i < len; i++) {
sender.send(new DefaultMessageTree());
}
sender.shutdown();
receiver.shutdown();
}
}));
for (Future<?> future : futures) {
future.get();
}
pool.shutdown();
Assert.assertEquals(len, sb.length());
}
static class MockMessageHandler implements MessageHandler {
private StringBuilder m_sb;
public MockMessageHandler(StringBuilder sb) {
m_sb = sb;
}
@Override
public void handle(MessageTree tree) {
m_sb.append('.');
}
}
}
......@@ -10,9 +10,9 @@ import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.io.DefaultTransportManager;
import com.dianping.cat.message.io.InMemoryQueue;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.helper.Stringizers;
......@@ -26,7 +26,7 @@ public class SqlJobDataProduceTest extends CatTestCase {
DefaultTransportManager transport = (DefaultTransportManager) lookup(TransportManager.class);
MessageSender messageSender = lookup(MessageSender.class, "in-memory");
transport.setSender(messageSender);
InMemoryQueue queue = lookup(InMemoryQueue.class);
MessageQueue queue = lookup(MessageQueue.class);
long currentTimeMillis = System.currentTimeMillis();
long currentHour = currentTimeMillis - currentTimeMillis % (60 * 60 * 1000);
......@@ -76,7 +76,7 @@ public class SqlJobDataProduceTest extends CatTestCase {
} finally {
t.complete();
}
MessageTree tree = queue.poll(0);
MessageTree tree = queue.poll();
tree.setDomain("domain" + i);
storage.store(tree);
}
......
......@@ -59,8 +59,7 @@ public abstract class AbstractReportPayload<A extends Action> implements ActionP
cal.set(Calendar.DATE, 1);
m_date = cal.getTimeInMillis();
} else if ("week".equals(m_reportType)) {
m_date = m_date - (ONE_HOUR) * (weekOfDay - 1) * 24;
m_date = m_date + ONE_HOUR * 24;
m_date = m_date - (ONE_HOUR) * (weekOfDay % 7) * 24;
if (m_date > System.currentTimeMillis()) {
m_date = m_date - 7 * 24 * ONE_HOUR;
}
......@@ -130,7 +129,7 @@ public abstract class AbstractReportPayload<A extends Action> implements ActionP
int maxDay = cal.getActualMaximum(Calendar.DAY_OF_MONTH);
temp = m_date + maxDay * (ONE_HOUR * 24);
} else if ("week".equals(m_reportType)) {
temp = m_date + 8 * (ONE_HOUR * 24);
temp = m_date + 7 * (ONE_HOUR * 24);
} else {
temp = m_date + (ONE_HOUR * 24);
}
......
......@@ -99,7 +99,6 @@ public class Handler implements PageHandler<Context> {
case MOBILE_IP:
String ip = payload.getIp();
String location = IPSeekerManager.getLocation(ip);
System.out.println(ip + ":" + location);
model.setMobileResponse(location);
break;
}
......
......@@ -10,29 +10,29 @@ import com.google.gson.Gson;
public class GraphItem {
private double[] ylable;
private double[] m_ylable;
private String titles;
private String m_titles;
private String start;
private String m_start;
private int size;
private int m_size;
private List<String> subTitles = new ArrayList<String>();
private List<String> m_subTitles = new ArrayList<String>();
private List<double[]> values = new ArrayList<double[]>();
private List<double[]> m_values = new ArrayList<double[]>();
private transient SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm");
public GraphItem() {
}
public GraphItem addSubTitle(String title) {
subTitles.add(title);
m_subTitles.add(title);
return this;
}
public GraphItem addValue(double[] value) {
values.add(value);
m_values.add(value);
return this;
}
......@@ -42,58 +42,58 @@ public class GraphItem {
}
public String getStart() {
return start;
return m_start;
}
public void setStart(Date start) {
this.start = sdf.format(start);
m_start = sdf.format(start);
}
public int getSize() {
return size;
return m_size;
}
public void setSize(int size) {
this.size = size;
m_size = size;
}
public void setStart(String start) {
this.start = start;
m_start = start;
}
public double[] getYlable() {
return ylable;
return m_ylable;
}
public void setYlable(double[] ylable) {
if (ylable == null) {
this.ylable = new double[0];
m_ylable = new double[0];
} else {
this.ylable = Arrays.copyOf(ylable, ylable.length);
m_ylable = Arrays.copyOf(ylable, ylable.length);
}
}
public String getTitles() {
return titles;
return m_titles;
}
public void setTitles(String titles) {
this.titles = titles;
m_titles = titles;
}
public List<String> getSubTitles() {
return subTitles;
return m_subTitles;
}
public void setSubTitles(List<String> subTitles) {
this.subTitles = subTitles;
m_subTitles = subTitles;
}
public List<double[]> getValues() {
return values;
return m_values;
}
public void setValues(List<double[]> values) {
this.values = values;
m_values = values;
}
}
package com.dianping.cat;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.webapp.WebAppContext;
public class SimpleServer {
public static void main(String[] args) throws Exception {
Server server = new Server(2281);
WebAppContext context = new WebAppContext();
context.setContextPath("/cat");
context.setDescriptor("src/main/webapp/WEB-INF/web.xml");
context.setResourceBase("src/main/webapp");
server.setHandler(context);
server.start();
server.join();
}
}
......@@ -125,9 +125,7 @@ public class PayloadTest {
temp = cal.getTimeInMillis();
int weekOfDay = cal.get(Calendar.DAY_OF_WEEK);
temp = temp - 24 * (weekOfDay - 1) * ONE_HOUR;
temp = temp + 24 * ONE_HOUR;
temp = temp - 24 * (weekOfDay) * ONE_HOUR;
if (temp > System.currentTimeMillis()) {
temp = temp - 7 * ONE_DAY;
}
......@@ -138,30 +136,33 @@ public class PayloadTest {
String lastOne = sdf.format(lastOneWeek);
String current = sdf.format(currentWeek);
System.out.println(current);
System.out.println(lastOne);
System.out.println(lastTwo);
payload.setDate(sdf.format(input));
payload.setStep(-1);
payload.computeStartDate();
checkDate(lastOne, payload.getHistoryStartDate());
checkDate(sdf.format(new Date(lastOneWeek.getTime() + 8 * ONE_DAY)), payload.getHistoryEndDate());
checkDate(sdf.format(new Date(lastOneWeek.getTime() + 7 * ONE_DAY)), payload.getHistoryEndDate());
payload.computeStartDate();
checkDate(lastTwo, payload.getHistoryStartDate());
checkDate(sdf.format(new Date(lastTwoWeek.getTime() + 8 * ONE_DAY)), payload.getHistoryEndDate());
checkDate(sdf.format(new Date(lastTwoWeek.getTime() + 7 * ONE_DAY)), payload.getHistoryEndDate());
payload.setStep(1);
payload.computeStartDate();
checkDate(lastOne, payload.getHistoryStartDate());
checkDate(sdf.format(new Date(lastOneWeek.getTime() + 8 * ONE_DAY)), payload.getHistoryEndDate());
checkDate(sdf.format(new Date(lastOneWeek.getTime() + 7 * ONE_DAY)), payload.getHistoryEndDate());
payload.computeStartDate();
payload.setStep(1);
checkDate(current, payload.getHistoryStartDate());
checkDate(sdf.format(currentWeek.getTime() + 8 * ONE_DAY), payload.getHistoryEndDate());
checkDate(sdf.format(currentWeek.getTime() + 7 * ONE_DAY), payload.getHistoryEndDate());
payload.computeStartDate();
checkDate(current, payload.getHistoryStartDate());
checkDate(sdf.format(currentWeek.getTime() + 8 * ONE_DAY), payload.getHistoryEndDate());
checkDate(sdf.format(currentWeek.getTime() + 7 * ONE_DAY), payload.getHistoryEndDate());
}
@Test
......
......@@ -5,7 +5,7 @@
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>0.3.4</version>
<name>arch-CAT</name>
<name>arch-cat</name>
<packaging>pom</packaging>
<modules>
<module>cat-core</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册