提交 cb7a0911 编写于 作者: Y yong.you

remove unused interface in message bucket

上级 de3d7adf
......@@ -109,7 +109,6 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
}
}
@Override
public void close() throws IOException {
synchronized (m_buckets) {
for (LocalMessageBucket bucket : m_buckets.values()) {
......@@ -580,7 +579,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
long current = System.currentTimeMillis() / 1000 / 60;
int min = (int) (current % (60));
// make system is 0-10 min is not busy
// make system 0-10 min is not busy
if (min > 10) {
moveOldMessages();
}
......
......@@ -6,8 +6,6 @@ import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
public interface MessageBucketManager {
public void close() throws IOException;
public MessageTree loadMessage(String messageId) throws IOException;
public void storeMessage(MessageTree tree,MessageId id) throws IOException;
......
......@@ -87,8 +87,6 @@ public class LocalMessageBucketManagerTest extends ComponentTestCase {
Assert.assertNotNull("Message " + i + " not found.", tree);
Assert.assertEquals(messageId, tree.getMessageId());
}
manager.close();
}
static class MockMessageIdFactory extends MessageIdFactory {
......
......@@ -3,15 +3,17 @@ package com.dianping.cat.hadoop.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.ContainerHolder;
......@@ -40,24 +42,28 @@ public class HdfsMessageBucketManager extends ContainerHolder implements Message
@Inject
private ServerConfigManager m_serverConfigManager;
private Map<String, HdfsMessageBucket> m_buckets = new HashMap<String, HdfsMessageBucket>();
@Override
public void close() throws IOException {
for (HdfsMessageBucket bucket : m_buckets.values()) {
bucket.close();
}
}
private Map<String, HdfsMessageBucket> m_buckets = new ConcurrentHashMap<String, HdfsMessageBucket>();
void closeIdleBuckets() throws IOException {
long now = System.currentTimeMillis();
long hour = 3600 * 1000L;
Set<String> closed = new HashSet<String>();
for (HdfsMessageBucket bucket : m_buckets.values()) {
for (Map.Entry<String, HdfsMessageBucket> entry : m_buckets.entrySet()) {
HdfsMessageBucket bucket = entry.getValue();
if (now - bucket.getLastAccessTime() >= hour) {
bucket.close();
try {
bucket.close();
closed.add(entry.getKey());
} catch (Exception e) {
Cat.logError(e);
}
}
}
for(String close:closed){
m_buckets.remove(close);
}
}
@Override
......@@ -115,7 +121,6 @@ public class HdfsMessageBucketManager extends ContainerHolder implements Message
bucket.initialize(dataFile);
m_buckets.put(dataFile, bucket);
}
if (bucket != null) {
MessageTree tree = bucket.findById(messageId);
......@@ -164,7 +169,7 @@ public class HdfsMessageBucketManager extends ContainerHolder implements Message
try {
closeIdleBuckets();
} catch (IOException e) {
e.printStackTrace();
Cat.logError(e);
}
}
} catch (InterruptedException e) {
......
......@@ -78,8 +78,6 @@ public class HdfsMessageBucketManagerTest extends ComponentTestCase {
Assert.assertNotNull("Message " + i + " not found.", tree);
Assert.assertEquals(messageId, tree.getMessageId());
}
hdfsManager.close();
}
static class MockMessageIdFactory extends MessageIdFactory {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册