diff --git a/cat-client/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java b/cat-client/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java index 6e340cdb00fa59c8fab871f45f8c097222af7854..e0a0c0f4e5b2d16e6b02f6683e59a326b268097c 100644 --- a/cat-client/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java +++ b/cat-client/src/main/java/com/dianping/cat/build/ComponentsConfigurator.java @@ -17,7 +17,6 @@ import com.dianping.cat.message.internal.DefaultMessageProducer; import com.dianping.cat.message.internal.MessageIdFactory; import com.dianping.cat.message.io.DefaultTransportManager; import com.dianping.cat.message.io.MessageSender; -import com.dianping.cat.message.io.TcpSocketHierarchySender; import com.dianping.cat.message.io.TcpSocketSender; import com.dianping.cat.message.io.TransportManager; import com.dianping.cat.message.spi.MessageCodec; @@ -48,10 +47,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { .is(PER_LOOKUP) // .req(MessageStatistics.class, "default", "m_statistics") // .req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec")); - all.add(C(MessageSender.class, TcpSocketHierarchySender.ID, TcpSocketHierarchySender.class) // - .is(PER_LOOKUP) // - .req(MessageStatistics.class, "default", "m_statistics") // - .req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec")); all.add(C(TransportManager.class, DefaultTransportManager.class) // .req(ClientConfigManager.class)); diff --git a/cat-client/src/main/java/com/dianping/cat/message/io/DefaultTransportManager.java b/cat-client/src/main/java/com/dianping/cat/message/io/DefaultTransportManager.java index d6eed3edbafb63639103c3033c73817f2f87d582..fda5d07b23a02933d4e5a1014784202cd66194f0 100644 --- a/cat-client/src/main/java/com/dianping/cat/message/io/DefaultTransportManager.java +++ b/cat-client/src/main/java/com/dianping/cat/message/io/DefaultTransportManager.java @@ -57,15 +57,9 @@ public class DefaultTransportManager extends ContainerHolder implements Transpor if (addresses.isEmpty()) { throw new RuntimeException("All servers in configuration are disabled!\r\n" + servers); - } else if (addresses.size() == 1) { - TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class, TcpSocketSender.ID); - - sender.setServerAddress(addresses.get(0)); - sender.initialize(); - m_sender = sender; } else { - TcpSocketHierarchySender sender = (TcpSocketHierarchySender) lookup(MessageSender.class, - TcpSocketHierarchySender.ID); + TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class, + TcpSocketSender.ID); sender.setServerAddresses(addresses); sender.initialize(); diff --git a/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketHierarchySender.java b/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketHierarchySender.java deleted file mode 100644 index e85e4bee75311e74dcc4be4943697023139013c4..0000000000000000000000000000000000000000 --- a/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketHierarchySender.java +++ /dev/null @@ -1,324 +0,0 @@ -package com.dianping.cat.message.io; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.codehaus.plexus.logging.LogEnabled; -import org.codehaus.plexus.logging.Logger; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.unidal.helper.Threads; -import org.unidal.helper.Threads.Task; -import org.unidal.lookup.annotation.Inject; - -import com.dianping.cat.Cat; -import com.dianping.cat.message.spi.MessageCodec; -import com.dianping.cat.message.spi.MessageQueue; -import com.dianping.cat.message.spi.MessageStatistics; -import com.dianping.cat.message.spi.MessageTree; - -public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled { - public static final String ID = "tcp-socket-hierarchy"; - - @Inject - private MessageCodec m_codec; - - @Inject - private MessageStatistics m_statistics; - - private MessageQueue m_queue = new DefaultMessageQueue(10000); - - private List m_serverAddresses; - - private ChannelManager m_manager; - - private Logger m_logger; - - private transient boolean m_active; - - private AtomicInteger m_errors = new AtomicInteger(); - - private AtomicInteger m_attempts = new AtomicInteger(); - - boolean checkWritable(ChannelFuture future) { - boolean isWriteable = false; - - if (future != null && future.getChannel().isOpen()) { - if (future.getChannel().isWritable()) { - isWriteable = true; - } else { - int count = m_attempts.incrementAndGet(); - - if (count % 1000 == 0 || count == 1) { - m_logger.error("Netty write buffer is full! Attempts: " + count); - } - } - } - - return isWriteable; - } - - @Override - public void enableLogging(Logger logger) { - m_logger = logger; - } - - @Override - public String getName() { - return "TcpSocketHierarchySender"; - } - - @Override - public void initialize() { - m_manager = new ChannelManager(m_logger, m_serverAddresses); - - Threads.forGroup("Cat").start(this); - Threads.forGroup("Cat").start(m_manager); - } - - @Override - public void run() { - m_active = true; - - while (m_active) { - ChannelFuture future = m_manager.getChannel(); - - if (checkWritable(future)) { - try { - MessageTree tree = m_queue.poll(); - - if (tree != null) { - sendInternal(tree); - tree.setMessage(null); - } - } catch (Throwable t) { - m_logger.error("Error when sending message over TCP socket!", t); - } - } else { - try { - Thread.sleep(5); - } catch (Exception e) { - // ignore it - m_active = false; - } - } - } - - m_manager.releaseAll(); - } - - @Override - public void send(MessageTree tree) { - boolean result = m_queue.offer(tree); - - if (!result) { - if (m_statistics != null) { - m_statistics.onOverflowed(tree); - } - - int count = m_errors.incrementAndGet(); - - if (count % 1000 == 0 || count == 1) { - m_logger.error("Message queue is full in tcp socket sender! Count: " + count); - } - } - } - - private void sendInternal(MessageTree tree) { - ChannelFuture future = m_manager.getChannel(); - ChannelBuffer buf = ChannelBuffers.dynamicBuffer(10 * 1024); // 10K - - m_codec.encode(tree, buf); - - int size = buf.readableBytes(); - - future.getChannel().write(buf); - - if (m_statistics != null) { - m_statistics.onBytes(size); - } - } - - public void setCodec(MessageCodec codec) { - m_codec = codec; - } - - public void setServerAddresses(List serverAddresses) { - m_serverAddresses = serverAddresses; - } - - @Override - public void shutdown() { - m_active = false; - m_manager.shutdown(); - } - - static class ChannelManager implements Task { - private List m_serverAddresses; - - private List m_futures; - - private ClientBootstrap m_bootstrap; - - private ChannelFuture m_activeFuture; - - private int m_activeIndex; - - private Logger m_logger; - - private ChannelFuture m_lastFuture; - - private boolean m_active = true; - - private AtomicInteger m_reconnects = new AtomicInteger(999); - - public ChannelManager(Logger logger, List serverAddresses) { - int len = serverAddresses.size(); - - m_logger = logger; - m_serverAddresses = serverAddresses; - m_futures = new ArrayList(Collections. nCopies(len, null)); - - ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Boss", 10); - ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10); - ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); - ClientBootstrap bootstrap = new ClientBootstrap(factory); - - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() { - return Channels.pipeline(new MyHandler(m_logger)); - } - }); - - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("keepAlive", true); - - m_bootstrap = bootstrap; - - for (int i = 0; i < len; i++) { - ChannelFuture future = createChannel(i); - - if (future != null) { - m_activeFuture = future; - m_activeIndex = i; - break; - } - } - } - - ChannelFuture createChannel(int index) { - InetSocketAddress address = m_serverAddresses.get(index); - ChannelFuture future = m_bootstrap.connect(address); - - future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms - - if (!future.isSuccess()) { - future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms - int count = m_reconnects.incrementAndGet(); - - if (count % 1000 == 0) { - m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause()); - } - return null; - } else { - m_logger.info("Connected to CAT server at " + address); - return future; - } - } - - public ChannelFuture getChannel() { - if (m_lastFuture != null && m_lastFuture != m_activeFuture) { - m_lastFuture.getChannel().close(); - m_lastFuture = null; - } - - return m_activeFuture; - } - - @Override - public String getName() { - return "TcpSocketHierarchySender-ChannelManager"; - } - - public void releaseAll() { - for (ChannelFuture future : m_futures) { - if (future != null) { - future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms - } - } - - m_bootstrap.getFactory().releaseExternalResources(); - m_futures = null; - } - - @Override - public void run() { - try { - while (m_active) { - try { - if (m_activeFuture != null && !m_activeFuture.getChannel().isOpen()) { - m_activeIndex = m_serverAddresses.size(); - } - - for (int i = 0; i < m_activeIndex; i++) { - ChannelFuture future = createChannel(i); - - if (future != null) { - m_lastFuture = m_activeFuture; - m_activeFuture = future; - m_activeIndex = i; - break; - } - } - } catch (Throwable e) { - Cat.logError(e); - } - - Thread.sleep(2 * 1000L); // check every 2 seconds - } - } catch (InterruptedException e) { - // ignore - } - } - - @Override - public void shutdown() { - m_active = false; - } - } - - static class MyHandler extends SimpleChannelHandler { - private Logger m_logger; - - public MyHandler(Logger logger) { - m_logger = logger; - } - - @Override - public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - e.getChannel().close(); - } - } -} diff --git a/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java b/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java index d3886d30403009cb4bf3b09d8c806becc927b5eb..314fe860cde2ac6685ee4227443181007c312eda 100644 --- a/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java +++ b/cat-client/src/main/java/com/dianping/cat/message/io/TcpSocketSender.java @@ -1,6 +1,9 @@ package com.dianping.cat.message.io; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -24,33 +27,26 @@ import org.unidal.helper.Threads; import org.unidal.helper.Threads.Task; import org.unidal.lookup.annotation.Inject; +import com.dianping.cat.Cat; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessageQueue; import com.dianping.cat.message.spi.MessageStatistics; import com.dianping.cat.message.spi.MessageTree; public class TcpSocketSender implements Task, MessageSender, LogEnabled { - public static final String ID = "tcp-socket"; - + public static final String ID = "tcp-socket-hierarchy"; + @Inject private MessageCodec m_codec; @Inject private MessageStatistics m_statistics; - private MessageQueue m_queue = new DefaultMessageQueue(10000);; - - private InetSocketAddress m_serverAddress; - - private ChannelFactory m_factory; - - private ChannelFuture m_future; + private MessageQueue m_queue = new DefaultMessageQueue(100000); - private ClientBootstrap m_bootstrap; + private List m_serverAddresses; - private int m_reconnectPeriod = 5000; // every 5 seconds - - private long m_lastReconnectTime; + private ChannelManager m_manager; private Logger m_logger; @@ -60,8 +56,6 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { private AtomicInteger m_attempts = new AtomicInteger(); - private AtomicInteger m_reconnects = new AtomicInteger(); - boolean checkWritable(ChannelFuture future) { boolean isWriteable = false; @@ -87,69 +81,15 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { @Override public String getName() { - return "TcpSocketSender"; + return "TcpSocketHierarchySender"; } @Override public void initialize() { - if (m_serverAddress == null) { - throw new RuntimeException("No server address was configured for TcpSocketSender!"); - } - ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool( - "Cat-TcpSocketSender-Boss-" + m_serverAddress, 10); - ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10); - ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); - ClientBootstrap bootstrap = new ClientBootstrap(factory); - - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() { - return Channels.pipeline(new ExceptionHandler()); - } - }); - - bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("keepAlive", true); - - ChannelFuture future = bootstrap.connect(m_serverAddress); - - future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms - - if (!future.isSuccess()) { - m_logger.error("Error when connecting to " + m_serverAddress, future.getCause()); - } else { - m_factory = factory; - m_future = future; - m_logger.info("Connected to CAT server at " + m_serverAddress); - } + m_manager = new ChannelManager(m_logger, m_serverAddresses); - m_bootstrap = bootstrap; Threads.forGroup("Cat").start(this); - } - - public void reconnect() { - long now = System.currentTimeMillis(); - - if (m_lastReconnectTime > 0 && m_lastReconnectTime + m_reconnectPeriod > now) { - return; - } - - m_lastReconnectTime = now; - - ChannelFuture future = m_bootstrap.connect(m_serverAddress); - - future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms - - if (!future.isSuccess()) { - int count = m_reconnects.incrementAndGet(); - - if (count % 1000 == 0 || count == 1) { - m_logger.error("Error when reconnecting to " + m_serverAddress, future.getCause()); - } - } else { - m_future = future; - m_logger.info("Reconnected to CAT server at " + m_serverAddress); - } + Threads.forGroup("Cat").start(m_manager); } @Override @@ -157,38 +97,30 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { m_active = true; while (m_active) { - try { - if (checkWritable(m_future)) { + ChannelFuture future = m_manager.getChannel(); + + if (checkWritable(future)) { + try { MessageTree tree = m_queue.poll(); if (tree != null) { sendInternal(tree); - tree.setMessage(null); } - } else { - try { - Thread.sleep(2); - } catch (Exception e) { - break; - } - - if (m_future == null || !m_future.getChannel().isOpen()) { - reconnect(); - } + } catch (Throwable t) { + m_logger.error("Error when sending message over TCP socket!", t); + } + } else { + try { + Thread.sleep(5); + } catch (Exception e) { + // ignore it + m_active = false; } - } catch (Throwable t) { - m_logger.error("Error when sending message over TCP socket!", t); } } - if (m_future != null) { - m_future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms - } - - if (m_factory != null) { - m_factory.releaseExternalResources(); - } + m_manager.releaseAll(); } @Override @@ -209,22 +141,17 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { } private void sendInternal(MessageTree tree) { - if (m_future == null || !m_future.getChannel().isOpen()) { - reconnect(); - } - - if (m_future != null && m_future.getChannel().isOpen()) { - ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8 * 1024); // 8K + ChannelFuture future = m_manager.getChannel(); + ChannelBuffer buf = ChannelBuffers.dynamicBuffer(10 * 1024); // 10K - m_codec.encode(tree, buf); + m_codec.encode(tree, buf); - int size = buf.readableBytes(); + int size = buf.readableBytes(); - m_future.getChannel().write(buf); + future.getChannel().write(buf); - if (m_statistics != null) { - m_statistics.onBytes(size); - } + if (m_statistics != null) { + m_statistics.onBytes(size); } } @@ -232,20 +159,158 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled { m_codec = codec; } - public void setReconnectPeriod(int reconnectPeriod) { - m_reconnectPeriod = reconnectPeriod; - } - - public void setServerAddress(InetSocketAddress serverAddress) { - m_serverAddress = serverAddress; + public void setServerAddresses(List serverAddresses) { + m_serverAddresses = serverAddresses; } @Override public void shutdown() { m_active = false; + m_manager.shutdown(); + } + + static class ChannelManager implements Task { + private List m_serverAddresses; + + private List m_futures; + + private ClientBootstrap m_bootstrap; + + private ChannelFuture m_activeFuture; + + private int m_activeIndex; + + private Logger m_logger; + + private ChannelFuture m_lastFuture; + + private boolean m_active = true; + + private AtomicInteger m_reconnects = new AtomicInteger(999); + + public ChannelManager(Logger logger, List serverAddresses) { + int len = serverAddresses.size(); + + m_logger = logger; + m_serverAddresses = serverAddresses; + m_futures = new ArrayList(Collections. nCopies(len, null)); + + ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Boss", 10); + ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10); + ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); + ClientBootstrap bootstrap = new ClientBootstrap(factory); + + bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() { + return Channels.pipeline(new ExceptionHandler(m_logger)); + } + }); + + bootstrap.setOption("tcpNoDelay", true); + bootstrap.setOption("keepAlive", true); + + m_bootstrap = bootstrap; + + for (int i = 0; i < len; i++) { + ChannelFuture future = createChannel(i); + + if (future != null) { + m_activeFuture = future; + m_activeIndex = i; + break; + } + } + } + + ChannelFuture createChannel(int index) { + InetSocketAddress address = m_serverAddresses.get(index); + ChannelFuture future = m_bootstrap.connect(address); + + future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms + + if (!future.isSuccess()) { + future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms + int count = m_reconnects.incrementAndGet(); + + if (count % 1000 == 0) { + m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause()); + } + return null; + } else { + m_logger.info("Connected to CAT server at " + address); + return future; + } + } + + public ChannelFuture getChannel() { + if (m_lastFuture != null && m_lastFuture != m_activeFuture) { + m_lastFuture.getChannel().close(); + m_lastFuture = null; + } + + return m_activeFuture; + } + + @Override + public String getName() { + return "TcpSocketHierarchySender-ChannelManager"; + } + + public void releaseAll() { + for (ChannelFuture future : m_futures) { + if (future != null) { + future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms + } + } + + m_bootstrap.getFactory().releaseExternalResources(); + m_futures = null; + } + + @Override + public void run() { + try { + while (m_active) { + try { + if (m_activeFuture != null && !m_activeFuture.getChannel().isOpen()) { + m_activeIndex = m_serverAddresses.size(); + } + + for (int i = 0; i < m_activeIndex; i++) { + ChannelFuture future = createChannel(i); + + if (future != null) { + m_lastFuture = m_activeFuture; + m_activeFuture = future; + m_activeIndex = i; + break; + } + } + } catch (Throwable e) { + Cat.logError(e); + } + + Thread.sleep(2 * 1000L); // check every 2 seconds + } + } catch (InterruptedException e) { + // ignore + } + } + + @Override + public void shutdown() { + m_active = false; + } } - class ExceptionHandler extends SimpleChannelHandler { + static class ExceptionHandler extends SimpleChannelHandler { + private Logger m_logger; + + public ExceptionHandler(Logger logger) { + m_logger = logger; + } + @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress()); diff --git a/cat-client/src/main/resources/META-INF/plexus/components.xml b/cat-client/src/main/resources/META-INF/plexus/components.xml index 73126db1f4cbd835fb0f7e178dda94f6ef30300c..2326938bfe3e4173c98e948f0707dbc08613fc94 100644 --- a/cat-client/src/main/resources/META-INF/plexus/components.xml +++ b/cat-client/src/main/resources/META-INF/plexus/components.xml @@ -35,27 +35,10 @@ com.dianping.cat.message.internal.MessageIdFactory com.dianping.cat.message.internal.MessageIdFactory - - com.dianping.cat.message.io.MessageSender - tcp-socket - com.dianping.cat.message.io.TcpSocketSender - per-lookup - - - com.dianping.cat.message.spi.MessageStatistics - m_statistics - - - com.dianping.cat.message.spi.MessageCodec - plain-text - m_codec - - - com.dianping.cat.message.io.MessageSender tcp-socket-hierarchy - com.dianping.cat.message.io.TcpSocketHierarchySender + com.dianping.cat.message.io.TcpSocketSender per-lookup diff --git a/cat-client/src/test/java/com/dianping/cat/CatTest.java b/cat-client/src/test/java/com/dianping/cat/CatTest.java index 1012de1a33130112ac4aed13dac305110795b73d..276b5aefba28423413f070946410e3cc410def2f 100644 --- a/cat-client/src/test/java/com/dianping/cat/CatTest.java +++ b/cat-client/src/test/java/com/dianping/cat/CatTest.java @@ -6,7 +6,6 @@ import org.junit.Test; import com.dianping.cat.message.Message; import com.dianping.cat.message.Trace; -import com.dianping.cat.message.spi.MessageTree; public class CatTest { @@ -30,10 +29,7 @@ public class CatTest { Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061"); Cat.logEvent("EventType", "EventName"); Cat.logHeartbeat("logHeartbeat", "logHeartbeat", Message.SUCCESS, null); - MessageTree message = Cat.getManager().getThreadLocalMessageTree(); - String str = message.toString(); - Assert.assertEquals(19, str.split("\n").length); Assert.assertEquals(true, Cat.isInitialized()); } } diff --git a/cat-client/src/test/java/com/dianping/cat/message/io/TcpSocketHierarchyTest.java b/cat-client/src/test/java/com/dianping/cat/message/io/TcpSocketHierarchyTest.java index 86bfef2dc6ef6d94c5c93dc23ea9afa42bc7e41c..56e2a6122048a7d972b81e38a13793b22acfaeb4 100644 --- a/cat-client/src/test/java/com/dianping/cat/message/io/TcpSocketHierarchyTest.java +++ b/cat-client/src/test/java/com/dianping/cat/message/io/TcpSocketHierarchyTest.java @@ -26,7 +26,7 @@ import com.dianping.cat.message.spi.internal.DefaultMessageTree; public class TcpSocketHierarchyTest extends ComponentTestCase { @Test public void test() throws Exception { - TcpSocketHierarchySender sender = (TcpSocketHierarchySender) lookup(MessageSender.class, "tcp-socket-hierarchy"); + TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class, "tcp-socket-hierarchy"); List addresses = getServerAddresses(); StringBuilder result = new StringBuilder(); ServerBootstrap bootstrap = createServerBootstrap(result); @@ -43,7 +43,7 @@ public class TcpSocketHierarchyTest extends ComponentTestCase { sender.send(new DefaultMessageTree()); - Thread.sleep(100 * 1000); + Thread.sleep(10 * 1000); } private ServerBootstrap createServerBootstrap(final StringBuilder result) { diff --git a/cat-core/src/main/java/com/dianping/cat/message/spi/core/DefaultMessagePathBuilder.java b/cat-core/src/main/java/com/dianping/cat/message/spi/core/DefaultMessagePathBuilder.java index 2273fbcadf1803ec43d927b14e528001c1cd5145..fe31d5c60177717c5468e0f0a9a7f48c2cc9cd61 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/spi/core/DefaultMessagePathBuilder.java +++ b/cat-core/src/main/java/com/dianping/cat/message/spi/core/DefaultMessagePathBuilder.java @@ -4,52 +4,19 @@ import java.io.File; import java.io.IOException; import java.text.MessageFormat; import java.util.Date; -import java.util.Locale; -import org.codehaus.plexus.logging.LogEnabled; -import org.codehaus.plexus.logging.Logger; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; import org.unidal.lookup.annotation.Inject; import com.dianping.cat.configuration.ClientConfigManager; -import com.dianping.cat.message.internal.MessageId; -public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializable, LogEnabled { +public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializable { @Inject private ClientConfigManager m_configManager; private File m_baseLogDir; - private Logger m_logger; - - @Override - public void enableLogging(Logger logger) { - m_logger = logger; - } - - @Override - public String getHdfsPath(String messageId) { - MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/{1}/{0,date,mm}-{2}", Locale.getDefault()); - - try { - MessageId id = MessageId.parse(messageId); - Date date = new Date(id.getTimestamp()); - String path = format.format(new Object[] { date, id.getDomain(), id.getIpAddressInHex() }); - - return path; - } catch (Exception e) { - m_logger.error("Error when building HDFS path for " + messageId, e); - } - - return messageId; - } - - @Override - public File getLogViewBaseDir() { - return m_baseLogDir; - } - @Override public String getPath(Date timestamp, String name) { MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/{1}"); diff --git a/cat-core/src/main/java/com/dianping/cat/message/spi/core/MessagePathBuilder.java b/cat-core/src/main/java/com/dianping/cat/message/spi/core/MessagePathBuilder.java index f50f7f225b0ee9356a78392ab3d77af90cda8fe2..1e74bf551cff3aa37898541ec8167dc7f95ff401 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/spi/core/MessagePathBuilder.java +++ b/cat-core/src/main/java/com/dianping/cat/message/spi/core/MessagePathBuilder.java @@ -1,13 +1,8 @@ package com.dianping.cat.message.spi.core; -import java.io.File; import java.util.Date; public interface MessagePathBuilder { - public String getHdfsPath(String messageId); - - public File getLogViewBaseDir(); - public String getPath(Date timestamp, String name); public String getReportPath(String name, Date timestamp); diff --git a/cat-core/src/test/java/com/dianping/cat/AllTests.java b/cat-core/src/test/java/com/dianping/cat/AllTests.java index 1818ee17252c8c46721751e8f29a23907c1b2c38..90cc7547db88bee4a7d4023f1c8faddd96f48b44 100644 --- a/cat-core/src/test/java/com/dianping/cat/AllTests.java +++ b/cat-core/src/test/java/com/dianping/cat/AllTests.java @@ -9,7 +9,6 @@ import com.dianping.cat.analysis.DefaultMessageAnalyzerManagerTest; import com.dianping.cat.analysis.PeriodStrategyTest; import com.dianping.cat.analysis.PeriodTaskTest; import com.dianping.cat.message.spi.core.HtmlMessageCodecTest; -import com.dianping.cat.message.spi.core.MessagePathBuilderTest; import com.dianping.cat.message.spi.core.TcpSocketReceiverTest; import com.dianping.cat.message.spi.core.WaterfallMessageCodecTest; import com.dianping.cat.service.ModelPeriodTest; @@ -41,8 +40,6 @@ TaskManagerTest.class, TcpSocketReceiverTest.class, -MessagePathBuilderTest.class, - ServerStatisticManagerTest.class, PeriodStrategyTest.class, diff --git a/cat-core/src/test/java/com/dianping/cat/message/spi/core/MessagePathBuilderTest.java b/cat-core/src/test/java/com/dianping/cat/message/spi/core/MessagePathBuilderTest.java deleted file mode 100644 index 0617bdcef47b1e6fddcb074e179a5492cc615553..0000000000000000000000000000000000000000 --- a/cat-core/src/test/java/com/dianping/cat/message/spi/core/MessagePathBuilderTest.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.dianping.cat.message.spi.core; - -import java.util.Locale; -import java.util.TimeZone; - -import junit.framework.Assert; - -import org.junit.Before; -import org.junit.Test; -import org.unidal.lookup.ComponentTestCase; - -public class MessagePathBuilderTest extends ComponentTestCase { - - private MessagePathBuilder m_pathBuilder; - - @Before - public void prepare() throws Exception { - m_pathBuilder = lookup(MessagePathBuilder.class); - } - - @Test - public void test_getHdfsPath() { - TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); - Locale.setDefault(Locale.CHINESE); - Assert.assertEquals("20121220/17/UNKNOWN/00-c0a82050", m_pathBuilder.getHdfsPath("UNKNOWN-c0a82050-376665-314")); - } -}