提交 803e2305 编写于 作者: Y yong.you

remove unused code

上级 01163ac5
......@@ -17,7 +17,6 @@ import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.DefaultTransportManager;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.io.TcpSocketHierarchySender;
import com.dianping.cat.message.io.TcpSocketSender;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageCodec;
......@@ -48,10 +47,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
.is(PER_LOOKUP) //
.req(MessageStatistics.class, "default", "m_statistics") //
.req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec"));
all.add(C(MessageSender.class, TcpSocketHierarchySender.ID, TcpSocketHierarchySender.class) //
.is(PER_LOOKUP) //
.req(MessageStatistics.class, "default", "m_statistics") //
.req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec"));
all.add(C(TransportManager.class, DefaultTransportManager.class) //
.req(ClientConfigManager.class));
......
......@@ -57,15 +57,9 @@ public class DefaultTransportManager extends ContainerHolder implements Transpor
if (addresses.isEmpty()) {
throw new RuntimeException("All servers in configuration are disabled!\r\n" + servers);
} else if (addresses.size() == 1) {
TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class, TcpSocketSender.ID);
sender.setServerAddress(addresses.get(0));
sender.initialize();
m_sender = sender;
} else {
TcpSocketHierarchySender sender = (TcpSocketHierarchySender) lookup(MessageSender.class,
TcpSocketHierarchySender.ID);
TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class,
TcpSocketSender.ID);
sender.setServerAddresses(addresses);
sender.initialize();
......
package com.dianping.cat.message.io;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled {
public static final String ID = "tcp-socket-hierarchy";
@Inject
private MessageCodec m_codec;
@Inject
private MessageStatistics m_statistics;
private MessageQueue m_queue = new DefaultMessageQueue(10000);
private List<InetSocketAddress> m_serverAddresses;
private ChannelManager m_manager;
private Logger m_logger;
private transient boolean m_active;
private AtomicInteger m_errors = new AtomicInteger();
private AtomicInteger m_attempts = new AtomicInteger();
boolean checkWritable(ChannelFuture future) {
boolean isWriteable = false;
if (future != null && future.getChannel().isOpen()) {
if (future.getChannel().isWritable()) {
isWriteable = true;
} else {
int count = m_attempts.incrementAndGet();
if (count % 1000 == 0 || count == 1) {
m_logger.error("Netty write buffer is full! Attempts: " + count);
}
}
}
return isWriteable;
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public String getName() {
return "TcpSocketHierarchySender";
}
@Override
public void initialize() {
m_manager = new ChannelManager(m_logger, m_serverAddresses);
Threads.forGroup("Cat").start(this);
Threads.forGroup("Cat").start(m_manager);
}
@Override
public void run() {
m_active = true;
while (m_active) {
ChannelFuture future = m_manager.getChannel();
if (checkWritable(future)) {
try {
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
try {
Thread.sleep(5);
} catch (Exception e) {
// ignore it
m_active = false;
}
}
}
m_manager.releaseAll();
}
@Override
public void send(MessageTree tree) {
boolean result = m_queue.offer(tree);
if (!result) {
if (m_statistics != null) {
m_statistics.onOverflowed(tree);
}
int count = m_errors.incrementAndGet();
if (count % 1000 == 0 || count == 1) {
m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
}
}
}
private void sendInternal(MessageTree tree) {
ChannelFuture future = m_manager.getChannel();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(10 * 1024); // 10K
m_codec.encode(tree, buf);
int size = buf.readableBytes();
future.getChannel().write(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
public void setCodec(MessageCodec codec) {
m_codec = codec;
}
public void setServerAddresses(List<InetSocketAddress> serverAddresses) {
m_serverAddresses = serverAddresses;
}
@Override
public void shutdown() {
m_active = false;
m_manager.shutdown();
}
static class ChannelManager implements Task {
private List<InetSocketAddress> m_serverAddresses;
private List<ChannelFuture> m_futures;
private ClientBootstrap m_bootstrap;
private ChannelFuture m_activeFuture;
private int m_activeIndex;
private Logger m_logger;
private ChannelFuture m_lastFuture;
private boolean m_active = true;
private AtomicInteger m_reconnects = new AtomicInteger(999);
public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses) {
int len = serverAddresses.size();
m_logger = logger;
m_serverAddresses = serverAddresses;
m_futures = new ArrayList<ChannelFuture>(Collections.<ChannelFuture> nCopies(len, null));
ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Boss", 10);
ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10);
ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(new MyHandler(m_logger));
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
m_bootstrap = bootstrap;
for (int i = 0; i < len; i++) {
ChannelFuture future = createChannel(i);
if (future != null) {
m_activeFuture = future;
m_activeIndex = i;
break;
}
}
}
ChannelFuture createChannel(int index) {
InetSocketAddress address = m_serverAddresses.get(index);
ChannelFuture future = m_bootstrap.connect(address);
future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms
if (!future.isSuccess()) {
future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
int count = m_reconnects.incrementAndGet();
if (count % 1000 == 0) {
m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause());
}
return null;
} else {
m_logger.info("Connected to CAT server at " + address);
return future;
}
}
public ChannelFuture getChannel() {
if (m_lastFuture != null && m_lastFuture != m_activeFuture) {
m_lastFuture.getChannel().close();
m_lastFuture = null;
}
return m_activeFuture;
}
@Override
public String getName() {
return "TcpSocketHierarchySender-ChannelManager";
}
public void releaseAll() {
for (ChannelFuture future : m_futures) {
if (future != null) {
future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
}
}
m_bootstrap.getFactory().releaseExternalResources();
m_futures = null;
}
@Override
public void run() {
try {
while (m_active) {
try {
if (m_activeFuture != null && !m_activeFuture.getChannel().isOpen()) {
m_activeIndex = m_serverAddresses.size();
}
for (int i = 0; i < m_activeIndex; i++) {
ChannelFuture future = createChannel(i);
if (future != null) {
m_lastFuture = m_activeFuture;
m_activeFuture = future;
m_activeIndex = i;
break;
}
}
} catch (Throwable e) {
Cat.logError(e);
}
Thread.sleep(2 * 1000L); // check every 2 seconds
}
} catch (InterruptedException e) {
// ignore
}
}
@Override
public void shutdown() {
m_active = false;
}
}
static class MyHandler extends SimpleChannelHandler {
private Logger m_logger;
public MyHandler(Logger logger) {
m_logger = logger;
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getChannel().close();
}
}
}
package com.dianping.cat.message.io;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -24,33 +27,26 @@ import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
public class TcpSocketSender implements Task, MessageSender, LogEnabled {
public static final String ID = "tcp-socket";
public static final String ID = "tcp-socket-hierarchy";
@Inject
private MessageCodec m_codec;
@Inject
private MessageStatistics m_statistics;
private MessageQueue m_queue = new DefaultMessageQueue(10000);;
private InetSocketAddress m_serverAddress;
private ChannelFactory m_factory;
private ChannelFuture m_future;
private MessageQueue m_queue = new DefaultMessageQueue(100000);
private ClientBootstrap m_bootstrap;
private List<InetSocketAddress> m_serverAddresses;
private int m_reconnectPeriod = 5000; // every 5 seconds
private long m_lastReconnectTime;
private ChannelManager m_manager;
private Logger m_logger;
......@@ -60,8 +56,6 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
private AtomicInteger m_attempts = new AtomicInteger();
private AtomicInteger m_reconnects = new AtomicInteger();
boolean checkWritable(ChannelFuture future) {
boolean isWriteable = false;
......@@ -87,69 +81,15 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
@Override
public String getName() {
return "TcpSocketSender";
return "TcpSocketHierarchySender";
}
@Override
public void initialize() {
if (m_serverAddress == null) {
throw new RuntimeException("No server address was configured for TcpSocketSender!");
}
ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool(
"Cat-TcpSocketSender-Boss-" + m_serverAddress, 10);
ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10);
ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(new ExceptionHandler());
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
ChannelFuture future = bootstrap.connect(m_serverAddress);
future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms
if (!future.isSuccess()) {
m_logger.error("Error when connecting to " + m_serverAddress, future.getCause());
} else {
m_factory = factory;
m_future = future;
m_logger.info("Connected to CAT server at " + m_serverAddress);
}
m_manager = new ChannelManager(m_logger, m_serverAddresses);
m_bootstrap = bootstrap;
Threads.forGroup("Cat").start(this);
}
public void reconnect() {
long now = System.currentTimeMillis();
if (m_lastReconnectTime > 0 && m_lastReconnectTime + m_reconnectPeriod > now) {
return;
}
m_lastReconnectTime = now;
ChannelFuture future = m_bootstrap.connect(m_serverAddress);
future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
if (!future.isSuccess()) {
int count = m_reconnects.incrementAndGet();
if (count % 1000 == 0 || count == 1) {
m_logger.error("Error when reconnecting to " + m_serverAddress, future.getCause());
}
} else {
m_future = future;
m_logger.info("Reconnected to CAT server at " + m_serverAddress);
}
Threads.forGroup("Cat").start(m_manager);
}
@Override
......@@ -157,38 +97,30 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
m_active = true;
while (m_active) {
try {
if (checkWritable(m_future)) {
ChannelFuture future = m_manager.getChannel();
if (checkWritable(future)) {
try {
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
} else {
try {
Thread.sleep(2);
} catch (Exception e) {
break;
}
if (m_future == null || !m_future.getChannel().isOpen()) {
reconnect();
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
try {
Thread.sleep(5);
} catch (Exception e) {
// ignore it
m_active = false;
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
}
if (m_future != null) {
m_future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
}
if (m_factory != null) {
m_factory.releaseExternalResources();
}
m_manager.releaseAll();
}
@Override
......@@ -209,22 +141,17 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
}
private void sendInternal(MessageTree tree) {
if (m_future == null || !m_future.getChannel().isOpen()) {
reconnect();
}
if (m_future != null && m_future.getChannel().isOpen()) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8 * 1024); // 8K
ChannelFuture future = m_manager.getChannel();
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(10 * 1024); // 10K
m_codec.encode(tree, buf);
m_codec.encode(tree, buf);
int size = buf.readableBytes();
int size = buf.readableBytes();
m_future.getChannel().write(buf);
future.getChannel().write(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
......@@ -232,20 +159,158 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
m_codec = codec;
}
public void setReconnectPeriod(int reconnectPeriod) {
m_reconnectPeriod = reconnectPeriod;
}
public void setServerAddress(InetSocketAddress serverAddress) {
m_serverAddress = serverAddress;
public void setServerAddresses(List<InetSocketAddress> serverAddresses) {
m_serverAddresses = serverAddresses;
}
@Override
public void shutdown() {
m_active = false;
m_manager.shutdown();
}
static class ChannelManager implements Task {
private List<InetSocketAddress> m_serverAddresses;
private List<ChannelFuture> m_futures;
private ClientBootstrap m_bootstrap;
private ChannelFuture m_activeFuture;
private int m_activeIndex;
private Logger m_logger;
private ChannelFuture m_lastFuture;
private boolean m_active = true;
private AtomicInteger m_reconnects = new AtomicInteger(999);
public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses) {
int len = serverAddresses.size();
m_logger = logger;
m_serverAddresses = serverAddresses;
m_futures = new ArrayList<ChannelFuture>(Collections.<ChannelFuture> nCopies(len, null));
ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Boss", 10);
ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10);
ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor);
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
return Channels.pipeline(new ExceptionHandler(m_logger));
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
m_bootstrap = bootstrap;
for (int i = 0; i < len; i++) {
ChannelFuture future = createChannel(i);
if (future != null) {
m_activeFuture = future;
m_activeIndex = i;
break;
}
}
}
ChannelFuture createChannel(int index) {
InetSocketAddress address = m_serverAddresses.get(index);
ChannelFuture future = m_bootstrap.connect(address);
future.awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100 ms
if (!future.isSuccess()) {
future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
int count = m_reconnects.incrementAndGet();
if (count % 1000 == 0) {
m_logger.error("Error when try to connecting to " + address + ", message: " + future.getCause());
}
return null;
} else {
m_logger.info("Connected to CAT server at " + address);
return future;
}
}
public ChannelFuture getChannel() {
if (m_lastFuture != null && m_lastFuture != m_activeFuture) {
m_lastFuture.getChannel().close();
m_lastFuture = null;
}
return m_activeFuture;
}
@Override
public String getName() {
return "TcpSocketHierarchySender-ChannelManager";
}
public void releaseAll() {
for (ChannelFuture future : m_futures) {
if (future != null) {
future.getChannel().getCloseFuture().awaitUninterruptibly(100, TimeUnit.MILLISECONDS); // 100ms
}
}
m_bootstrap.getFactory().releaseExternalResources();
m_futures = null;
}
@Override
public void run() {
try {
while (m_active) {
try {
if (m_activeFuture != null && !m_activeFuture.getChannel().isOpen()) {
m_activeIndex = m_serverAddresses.size();
}
for (int i = 0; i < m_activeIndex; i++) {
ChannelFuture future = createChannel(i);
if (future != null) {
m_lastFuture = m_activeFuture;
m_activeFuture = future;
m_activeIndex = i;
break;
}
}
} catch (Throwable e) {
Cat.logError(e);
}
Thread.sleep(2 * 1000L); // check every 2 seconds
}
} catch (InterruptedException e) {
// ignore
}
}
@Override
public void shutdown() {
m_active = false;
}
}
class ExceptionHandler extends SimpleChannelHandler {
static class ExceptionHandler extends SimpleChannelHandler {
private Logger m_logger;
public ExceptionHandler(Logger logger) {
m_logger = logger;
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
m_logger.warn("Channel disconnected by remote address: " + e.getChannel().getRemoteAddress());
......
......@@ -35,27 +35,10 @@
<role>com.dianping.cat.message.internal.MessageIdFactory</role>
<implementation>com.dianping.cat.message.internal.MessageIdFactory</implementation>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>tcp-socket</role-hint>
<implementation>com.dianping.cat.message.io.TcpSocketSender</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageStatistics</role>
<field-name>m_statistics</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
<field-name>m_codec</field-name>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>tcp-socket-hierarchy</role-hint>
<implementation>com.dianping.cat.message.io.TcpSocketHierarchySender</implementation>
<implementation>com.dianping.cat.message.io.TcpSocketSender</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
......
......@@ -6,7 +6,6 @@ import org.junit.Test;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Trace;
import com.dianping.cat.message.spi.MessageTree;
public class CatTest {
......@@ -30,10 +29,7 @@ public class CatTest {
Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061");
Cat.logEvent("EventType", "EventName");
Cat.logHeartbeat("logHeartbeat", "logHeartbeat", Message.SUCCESS, null);
MessageTree message = Cat.getManager().getThreadLocalMessageTree();
String str = message.toString();
Assert.assertEquals(19, str.split("\n").length);
Assert.assertEquals(true, Cat.isInitialized());
}
}
......@@ -26,7 +26,7 @@ import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class TcpSocketHierarchyTest extends ComponentTestCase {
@Test
public void test() throws Exception {
TcpSocketHierarchySender sender = (TcpSocketHierarchySender) lookup(MessageSender.class, "tcp-socket-hierarchy");
TcpSocketSender sender = (TcpSocketSender) lookup(MessageSender.class, "tcp-socket-hierarchy");
List<InetSocketAddress> addresses = getServerAddresses();
StringBuilder result = new StringBuilder();
ServerBootstrap bootstrap = createServerBootstrap(result);
......@@ -43,7 +43,7 @@ public class TcpSocketHierarchyTest extends ComponentTestCase {
sender.send(new DefaultMessageTree());
Thread.sleep(100 * 1000);
Thread.sleep(10 * 1000);
}
private ServerBootstrap createServerBootstrap(final StringBuilder result) {
......
......@@ -4,52 +4,19 @@ import java.io.File;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Locale;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.internal.MessageId;
public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializable, LogEnabled {
public class DefaultMessagePathBuilder implements MessagePathBuilder, Initializable {
@Inject
private ClientConfigManager m_configManager;
private File m_baseLogDir;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public String getHdfsPath(String messageId) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/{1}/{0,date,mm}-{2}", Locale.getDefault());
try {
MessageId id = MessageId.parse(messageId);
Date date = new Date(id.getTimestamp());
String path = format.format(new Object[] { date, id.getDomain(), id.getIpAddressInHex() });
return path;
} catch (Exception e) {
m_logger.error("Error when building HDFS path for " + messageId, e);
}
return messageId;
}
@Override
public File getLogViewBaseDir() {
return m_baseLogDir;
}
@Override
public String getPath(Date timestamp, String name) {
MessageFormat format = new MessageFormat("{0,date,yyyyMMdd}/{0,date,HH}/{1}");
......
package com.dianping.cat.message.spi.core;
import java.io.File;
import java.util.Date;
public interface MessagePathBuilder {
public String getHdfsPath(String messageId);
public File getLogViewBaseDir();
public String getPath(Date timestamp, String name);
public String getReportPath(String name, Date timestamp);
......
......@@ -9,7 +9,6 @@ import com.dianping.cat.analysis.DefaultMessageAnalyzerManagerTest;
import com.dianping.cat.analysis.PeriodStrategyTest;
import com.dianping.cat.analysis.PeriodTaskTest;
import com.dianping.cat.message.spi.core.HtmlMessageCodecTest;
import com.dianping.cat.message.spi.core.MessagePathBuilderTest;
import com.dianping.cat.message.spi.core.TcpSocketReceiverTest;
import com.dianping.cat.message.spi.core.WaterfallMessageCodecTest;
import com.dianping.cat.service.ModelPeriodTest;
......@@ -41,8 +40,6 @@ TaskManagerTest.class,
TcpSocketReceiverTest.class,
MessagePathBuilderTest.class,
ServerStatisticManagerTest.class,
PeriodStrategyTest.class,
......
package com.dianping.cat.message.spi.core;
import java.util.Locale;
import java.util.TimeZone;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.unidal.lookup.ComponentTestCase;
public class MessagePathBuilderTest extends ComponentTestCase {
private MessagePathBuilder m_pathBuilder;
@Before
public void prepare() throws Exception {
m_pathBuilder = lookup(MessagePathBuilder.class);
}
@Test
public void test_getHdfsPath() {
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
Locale.setDefault(Locale.CHINESE);
Assert.assertEquals("20121220/17/UNKNOWN/00-c0a82050", m_pathBuilder.getHdfsPath("UNKNOWN-c0a82050-376665-314"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册