提交 1820ec5b 编写于 作者: Y yong.you

modify the message queue

上级 7106ba1b
......@@ -30,6 +30,7 @@ import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.spi.AbstractMessageAnalyzer;
import com.dianping.cat.message.spi.MessageAnalyzer;
import com.dianping.cat.message.spi.MessageConsumer;
......@@ -211,7 +212,7 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
for (String name : m_analyzerNames) {
MessageAnalyzer analyzer = m_factory.create(name, startTime, m_duration, m_extraTime);
MessageQueue queue = lookup(MessageQueue.class);
MessageQueue queue = new DefaultMessageQueue();
PeriodTask task = new PeriodTask(m_factory, analyzer, queue, startTime);
analyzers.put(name, analyzer);
......@@ -360,17 +361,21 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
try {
while (m_active) {
long now = System.currentTimeMillis();
long value = m_strategy.next(now);
if (value == 0) {
// do nothing here
} else if (value > 0) {
// prepare next period in ahead of 3 minutes
startPeriod(value);
} else {
// last period is over
endPeriod(-value);
try {
long now = System.currentTimeMillis();
long value = m_strategy.next(now);
if (value == 0) {
// do nothing here
} else if (value > 0) {
// prepare next period in ahead of 3 minutes
startPeriod(value);
} else {
// last period is over
endPeriod(-value);
}
} catch (Throwable e) {
Cat.logError(e);
}
Thread.sleep(1000L);
......
......@@ -18,11 +18,9 @@ public class DumpAnalyzerTest extends ComponentTestCase {
public void test() throws Exception {
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
long now = 1334041324150L;
DefaultMessageQueue queue = new DefaultMessageQueue();
int num = 1000000;
DefaultMessageQueue queue = new DefaultMessageQueue(num);
queue.setSize(num);
queue.initialize();
for (int i = 0; i < num; i++) {
queue.offer(newMessageTree(i, now + i * 10L));
......
......@@ -29,11 +29,8 @@ public class DumpUploaderSample extends ComponentTestCase {
public void testUpload() throws Exception {
AnalyzerFactory factory = lookup(AnalyzerFactory.class);
long now = System.currentTimeMillis();
DefaultMessageQueue queue = new DefaultMessageQueue();
int num = 10000;
queue.setSize(num);
queue.initialize();
DefaultMessageQueue queue = new DefaultMessageQueue(num);
for (int i = 0; i < num; i++) {
queue.offer(newMessageTree(i, now + i * 10L));
......
......@@ -3,6 +3,10 @@ package com.dianping.cat.build;
import java.util.ArrayList;
import java.util.List;
import org.unidal.initialization.Module;
import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.CatCoreModule;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.configuration.ServerConfigManager;
......@@ -10,7 +14,6 @@ import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.internal.DefaultMessageManager;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.io.DefaultTransportManager;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.io.TcpSocketHierarchySender;
......@@ -23,7 +26,6 @@ import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.MessageHandler;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.codec.HtmlMessageCodec;
......@@ -42,9 +44,6 @@ import com.dianping.cat.storage.dump.LocalMessageBucket;
import com.dianping.cat.storage.dump.LocalMessageBucketManager;
import com.dianping.cat.storage.dump.MessageBucket;
import com.dianping.cat.storage.dump.MessageBucketManager;
import org.unidal.initialization.Module;
import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
......@@ -73,20 +72,14 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { DummyConsumer.ID }, "m_consumers"));
all.add(C(MessageQueue.class, DefaultMessageQueue.class) //
.config(E("size").value("10000")) //
.is(PER_LOOKUP));
all.add(C(MessageSender.class, "tcp-socket", TcpSocketSender.class) //
.is(PER_LOOKUP) //
.req(MessageStatistics.class, "default", "m_statistics") //
.req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec")//
.req(MessageQueue.class, "default", "m_queue"));
.req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec"));
all.add(C(MessageSender.class, "tcp-socket-hierarchy", TcpSocketHierarchySender.class) //
.is(PER_LOOKUP) //
.req(MessageStatistics.class, "default", "m_statistics") //
.req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec")//
.req(MessageQueue.class, "default", "m_queue"));
.req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec"));
all.add(C(TcpSocketReceiver.class) //
.req(MessageCodec.class, PlainTextMessageCodec.ID)//
.req(ServerConfigManager.class, MessageHandler.class)//
......
......@@ -4,25 +4,28 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
import org.unidal.lookup.annotation.Inject;
public class DefaultMessageQueue implements MessageQueue, Initializable {
public class DefaultMessageQueue implements MessageQueue {
private BlockingQueue<MessageTree> m_queue;
@Inject
private int m_size;
@Override
public void initialize() throws InitializationException {
if (m_size > 0) {
m_queue = new LinkedBlockingQueue<MessageTree>(m_size);
private static final int SIZE = 500000;
public DefaultMessageQueue() {
this(SIZE);
}
public DefaultMessageQueue(int size) {
if (size > 0) {
m_queue = new LinkedBlockingQueue<MessageTree>(size);
} else {
m_queue = new LinkedBlockingQueue<MessageTree>(500000);
m_queue = new LinkedBlockingQueue<MessageTree>(SIZE);
}
}
......
......@@ -35,12 +35,11 @@ public class TcpSocketHierarchySender implements Task, MessageSender, LogEnabled
@Inject
private MessageCodec m_codec;
@Inject
private MessageQueue m_queue;
@Inject
private MessageStatistics m_statistics;
private MessageQueue m_queue = new DefaultMessageQueue(10000);
private List<InetSocketAddress> m_serverAddresses;
private ChannelManager m_manager;
......
......@@ -32,12 +32,11 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
@Inject
private MessageCodec m_codec;
@Inject
private MessageQueue m_queue;
@Inject
private MessageStatistics m_statistics;
private MessageQueue m_queue = new DefaultMessageQueue(10000);;
private InetSocketAddress m_serverAddress;
private ChannelFactory m_factory;
......@@ -70,7 +69,6 @@ public class TcpSocketSender implements Task, MessageSender, LogEnabled {
if (m_serverAddress == null) {
throw new RuntimeException("No server address was configured for TcpSocketSender!");
}
ExecutorService bossExecutor = Threads.forPool().getFixedThreadPool(
"Cat-TcpSocketSender-Boss-" + m_serverAddress, 10);
ExecutorService workerExecutor = Threads.forPool().getFixedThreadPool("Cat-TcpSocketSender-Worker", 10);
......
......@@ -121,7 +121,7 @@ public class LocalMessageBucket implements MessageBucket, LogEnabled {
boolean b = m_dirty.get();
if (b) {
synchronized (this) {
synchronized (m_out) {
m_out.close();
byte[] data = m_buf.toByteArray();
......
......@@ -182,7 +182,9 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
bucket = (LocalMessageBucket) lookup(MessageBucket.class, LocalMessageBucket.ID);
bucket.setBaseDir(m_baseDir);
bucket.initialize(dataFile);
m_buckets.put(dataFile, bucket);
synchronized (m_buckets) {
m_buckets.put(dataFile, bucket);
}
}
}
......@@ -258,7 +260,9 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
Cat.logError(e);
m_logger.error(e.getMessage(), e);
}
m_buckets.remove(path);
synchronized (m_buckets) {
m_buckets.remove(path);
}
} else {
try {
moveFile(path);
......@@ -435,7 +439,9 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
bucket = (LocalMessageBucket) lookup(MessageBucket.class, LocalMessageBucket.ID);
bucket.setBaseDir(m_baseDir);
bucket.initialize(dataFile);
m_buckets.put(dataFile, bucket);
synchronized (m_buckets) {
m_buckets.put(dataFile, bucket);
}
}
DefaultMessageTree tree = (DefaultMessageTree) item.getTree();
......@@ -542,7 +548,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
m_serverStateManager.addBlockTotal(1);
if ((++m_success) % 10000 == 0) {
int size = m_messageBlocks.size();
if (size > 0) {
m_logger.info("block queue size " + size);
}
......
......@@ -98,14 +98,6 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<implementation>com.dianping.cat.message.io.DefaultMessageQueue</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<size>10000</size>
</configuration>
</component>
<component>
<role>com.dianping.cat.message.io.MessageSender</role>
<role-hint>tcp-socket</role-hint>
......@@ -121,10 +113,6 @@
<role-hint>plain-text</role-hint>
<field-name>m_codec</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<field-name>m_queue</field-name>
</requirement>
</requirements>
</component>
<component>
......@@ -142,10 +130,6 @@
<role-hint>plain-text</role-hint>
<field-name>m_codec</field-name>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.MessageQueue</role>
<field-name>m_queue</field-name>
</requirement>
</requirements>
</component>
<component>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册