提交 be354861 编写于 作者: Y yong.you

remove unused code and modify the thread catch exception to throwable

上级 b78e3abb
......@@ -27,14 +27,11 @@ import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.codec.HtmlMessageCodec;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageHandler;
import com.dianping.cat.message.spi.internal.DefaultMessagePathBuilder;
import com.dianping.cat.message.spi.internal.DefaultMessageStatistics;
import com.dianping.cat.message.spi.internal.DefaultMessageStorage;
import com.dianping.cat.message.spi.internal.DummyConsumer;
import com.dianping.cat.status.ServerStateManager;
import com.dianping.cat.status.StatusUpdateTask;
......@@ -61,9 +58,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessagePathBuilder.class, DefaultMessagePathBuilder.class) //
.req(ClientConfigManager.class));
all.add(C(MessageStorage.class, "html", DefaultMessageStorage.class) //
.req(MessagePathBuilder.class) //
.req(MessageCodec.class, HtmlMessageCodec.ID));
all.add(C(MessageConsumer.class, DummyConsumer.ID, DummyConsumer.class));
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers"));
......
......@@ -103,7 +103,7 @@ public class ClientConfigReloader implements Task {
}
}
}
} catch (Exception e) {
} catch (Throwable e) {
e.printStackTrace();
}
Thread.sleep(2000L);
......
package com.dianping.cat.configuration;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Files;
import org.unidal.helper.Threads.Task;
import com.dianping.cat.configuration.server.entity.ConsoleConfig;
import com.dianping.cat.configuration.server.entity.Domain;
......@@ -26,8 +23,6 @@ public class ServerConfigManager implements LogEnabled {
private ServerConfig m_config;
private List<ServiceConfigSupport> m_listeners = new ArrayList<ServerConfigManager.ServiceConfigSupport>();
private Logger m_logger;
@Override
......@@ -235,12 +230,6 @@ public class ServerConfigManager implements LogEnabled {
}
}
public void onRefresh(ServiceConfigSupport listener) {
if (!m_listeners.contains(listener)) {
m_listeners.add(listener);
}
}
private long toLong(String str, long defaultValue) {
long value = 0;
int len = str == null ? 0 : str.length();
......@@ -267,53 +256,4 @@ public class ServerConfigManager implements LogEnabled {
public static interface ServerConfigKey {
public void add(String section);
}
static class ServerConfigReloader implements Task {
private File m_file;
private volatile boolean m_active = true;
public ServerConfigReloader(File file) {
m_file = file;
}
@Override
public String getName() {
return "ServerConfigReloader";
}
private boolean isActive() {
synchronized (this) {
return m_active;
}
}
@Override
public void run() {
while (isActive()) {
try {
if (m_file.exists()) {
// TODO
}
Thread.sleep(2000L);
} catch (InterruptedException e) {
m_active = false;
}
}
}
@Override
public void shutdown() {
synchronized (this) {
m_active = false;
}
}
}
public static interface ServiceConfigSupport {
public void buildKey(ServerConfigManager manager, ServerConfigKey key);
public void configure(ServerConfigManager manager, boolean firstTime);
}
}
......@@ -27,6 +27,7 @@ import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
......@@ -271,6 +272,7 @@ public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled
public void run() {
try {
while (m_active) {
try {
if (m_activeFuture != null && !m_activeFuture.getChannel().isOpen()) {
m_activeIndex = m_serverAddresses.size();
}
......@@ -284,6 +286,9 @@ public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled
m_activeIndex = i;
break;
}
}
} catch (Throwable e) {
Cat.logError(e);
}
Thread.sleep(2 * 1000L); // check every 2 seconds
......
package com.dianping.cat.message.spi;
/**
* @author franke.wu
* @author sean.wang
* @since Mar 6, 2012
*/
public interface MessageStorage {
/**
* Store a message tree to the storage.
*
* @param tree
* message tree to store
* @return relative path to base directory or base URL
*/
public String store(MessageTree tree, String... tag);
/**
* Fetch a message tree from the store.
*
* @param messageId
* @return
*/
public MessageTree get(String messageId);
/**
* @param messageId
* @return
*/
public MessageTree next(String messageId, String tag);
/**
* @param messageId
* @return
*/
public MessageTree previous(String messageId, String tag);
/**
*
* Get relative path to base directory or base URL.
*
* @param tree
* @return relative path to base directory or base URL
*/
public String getPath(MessageTree tree);
}
package com.dianping.cat.message.spi.internal;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageStorage implements MessageStorage, Initializable, LogEnabled {
@Inject
private MessagePathBuilder m_builder;
@Inject
private MessageCodec m_codec;
private WriteJob m_job = new WriteJob();
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public MessageTree get(String messageId) {
return null;
}
@Override
public String getPath(MessageTree tree) {
return tree.getMessageId();
}
@Override
public void initialize() throws InitializationException {
Threads.forGroup("Cat").start(m_job);
}
@Override
public MessageTree next(String messageId, String tag) {
return null;
}
@Override
public MessageTree previous(String messageId, String tag) {
return null;
}
@Override
public String store(MessageTree tree, String... tags) {
String path = tree.getMessageId();
m_job.append(tree);
return path;
}
class WriteJob implements Task {
private BlockingQueue<MessageTree> m_queue = new LinkedBlockingQueue<MessageTree>();
private boolean m_active = true;
public void append(MessageTree tree) {
try {
m_queue.offer(tree);
} catch (Exception e) {
m_logger.warn("Error when adding job to queue.", e);
}
}
@Override
public String getName() {
return "DefaultMessageStorage";
}
private void handle(MessageTree tree) {
String path = tree.getMessageId();
File file = new File(m_builder.getLogViewBaseDir(), path);
if (!file.exists()) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
FileOutputStream fos = null;
file.getParentFile().mkdirs();
try {
fos = new FileOutputStream(file);
m_codec.encode(tree, buf);
int length = buf.readInt();
buf.getBytes(buf.readerIndex(), fos, length);
} catch (IOException e) {
m_logger.error(String.format("Error when writing to file(%s)!", file), e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
// ignore it
}
}
}
}
}
@Override
public void run() {
try {
while (m_active) {
MessageTree tree = m_queue.poll(1000 * 1000L, TimeUnit.NANOSECONDS);
if (tree != null) {
handle(tree);
}
}
// process the remaining job in the queue
while (!m_active) {
MessageTree tree = m_queue.poll();
if (tree != null) {
handle(tree);
} else {
break;
}
}
} catch (Exception e) {
m_logger.warn("Error when writing message to local file system.", e);
}
}
@Override
public void shutdown() {
m_active = false;
}
}
}
......@@ -459,7 +459,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
if (t != null) {
t.setStatus(Message.SUCCESS);
}
} catch (Exception e) {
} catch (Throwable e) {
Cat.logError(e);
if (t != null) {
t.setStatus(e);
......
......@@ -52,20 +52,6 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
<role-hint>html</role-hint>
<implementation>com.dianping.cat.message.spi.internal.DefaultMessageStorage</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>html</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>dummy</role-hint>
......
package com.dianping.cat.hadoop.sql;
import java.io.File;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.unidal.helper.Stringizers;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
@RunWith(JUnit4.class)
public class SqlJobDataProduceTest extends ComponentTestCase {
@After
public void after() throws Exception {
Cat.reset();
Cat.destroy();
}
@Before
public void before() throws Exception {
Cat.initialize(getContainer(), new File(getClass().getResource("cat.xml").getPath()));
Cat.setup(null);
}
@Test
public void test() throws Exception {
MessageStorage storage = lookup(MessageStorage.class);
MessageProducer producer = lookup(MessageProducer.class);
MockTransportManager transport = (MockTransportManager) lookup(TransportManager.class);
long currentTimeMillis = System.currentTimeMillis();
long currentHour = currentTimeMillis - currentTimeMillis % (60 * 60 * 1000);
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 12000; j++) {
Transaction t = producer.newTransaction("URL", "MyPage" + (int) (j / 500));
try {
// do your business here
t.addData("k1", "v1");
t.addData("k2", "v2");
t.addData("k3", "v3");
Thread.sleep(1);
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
producer.logEvent("URL", "Payload", Message.SUCCESS, "host=my-host&ip=127.0.0.1&agent=...");
String sqlName = "Project.insert" + j / 500;
String sqlParaMeter = "SQLParaMeter" + j / 500;
String sqlStatement = "select * from table where id=\"1\"\n order by id desc";
Transaction sqlTran = producer.newTransaction("SQL", sqlName);
producer
.logEvent("SQL.PARAM", sqlParaMeter, Transaction.SUCCESS, Stringizers.forJson().compact().from(sqlParaMeter));
sqlTran.addData(sqlStatement);
sqlTran.complete();
DefaultTransaction sqlInternalTran = (DefaultTransaction) sqlTran;
sqlInternalTran.setDurationInMillis((long) Math.pow(2, j % 12));
if (j % 2 != 0) {
sqlTran.setStatus(Message.SUCCESS);
} else {
sqlTran.setStatus("Error");
}
sqlInternalTran.setTimestamp(currentHour + (j % 60) * 1000 * 60);
DefaultTransaction def = (DefaultTransaction) sqlTran;
def.setDurationInMillis(j % 100 + 50);
def.setTimestamp(currentHour + (j % 60) * 1000 * 60);
t.setStatus(Message.SUCCESS);
} catch (Exception e) {
t.setStatus(e);
} finally {
t.complete();
}
MessageTree tree = transport.getQueue().poll();
Assert.assertNotNull("No message found in the queue!", tree);
tree.setDomain("domain" + i);
storage.store(tree);
}
}
}
}
package com.dianping.cat.hadoop.sql;
import java.util.ArrayList;
import java.util.List;
import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.message.io.TransportManager;
public class SqlJobDataProduceTestConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(TransportManager.class, MockTransportManager.class));
return all;
}
@Override
protected Class<?> getTestClass() {
return SqlJobDataProduceTest.class;
}
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new SqlJobDataProduceTestConfigurator());
}
}
......@@ -47,7 +47,7 @@ public class AlarmTask implements Task {
try {
processExceptionRule();
processServiceRule();
} catch (Exception e) {
} catch (Throwable e) {
Cat.logError(e);
}
......
......@@ -121,7 +121,7 @@ public class AlertManager implements Initializable, LogEnabled {
active = false;
}
}
} catch (Exception e) {
} catch (Throwable e) {
Cat.logError(e);
}
}
......
......@@ -89,13 +89,9 @@ public class ThresholdAlarmMeta {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(400);
sb.append('[').append("RuleId:").append(m_ruleId).append(";");
sb.append("Type:").append(m_type).append(";");
sb.append("RealCount:").append(m_realCount).append(";");
sb.append("Duration:").append(m_duration).append(']');
return sb.toString();
return "ThresholdAlarmMeta [m_baseUrl=" + m_baseUrl + ", m_date=" + m_date + ", m_domain=" + m_domain
+ ", m_duration=" + m_duration + ", m_realCount=" + m_realCount + ", m_ruleId=" + m_ruleId + ", m_type="
+ m_type + "]";
}
}
......@@ -153,7 +153,7 @@ public class ScheduledMailTask implements Task {
} else {
Cat.getProducer().logEvent("ScheduledReport", "SendNot", Event.SUCCESS, null);
}
} catch (Exception e) {
} catch (Throwable e) {
Cat.logError(e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册