diff --git a/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java index 0e2fda5eb1ca9d7c97851d12a0597a95ba7d3f98..6eca2398aa81f50ec7c97af71f428a840df07f40 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/dump/LocalMessageBucketManager.java @@ -109,7 +109,6 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag } } - @Override public void close() throws IOException { synchronized (m_buckets) { for (LocalMessageBucket bucket : m_buckets.values()) { @@ -580,7 +579,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag long current = System.currentTimeMillis() / 1000 / 60; int min = (int) (current % (60)); - // make system is 0-10 min is not busy + // make system 0-10 min is not busy if (min > 10) { moveOldMessages(); } diff --git a/cat-core/src/main/java/com/dianping/cat/storage/dump/MessageBucketManager.java b/cat-core/src/main/java/com/dianping/cat/storage/dump/MessageBucketManager.java index a1b63db4ffec7ed3d761aa6857f2d2f466c89715..4d807be9e5e68067b79271507a1c0fe7c56e1f9e 100644 --- a/cat-core/src/main/java/com/dianping/cat/storage/dump/MessageBucketManager.java +++ b/cat-core/src/main/java/com/dianping/cat/storage/dump/MessageBucketManager.java @@ -6,8 +6,6 @@ import com.dianping.cat.message.internal.MessageId; import com.dianping.cat.message.spi.MessageTree; public interface MessageBucketManager { - public void close() throws IOException; - public MessageTree loadMessage(String messageId) throws IOException; public void storeMessage(MessageTree tree,MessageId id) throws IOException; diff --git a/cat-core/src/test/java/com/dianping/cat/storage/dump/LocalMessageBucketManagerTest.java b/cat-core/src/test/java/com/dianping/cat/storage/dump/LocalMessageBucketManagerTest.java index 0dd13d19fcaacc26c5a1813a545b0945078d41f6..8e46ba7d6f0f8ea0a4ee654f156a435402fa17b5 100644 --- a/cat-core/src/test/java/com/dianping/cat/storage/dump/LocalMessageBucketManagerTest.java +++ b/cat-core/src/test/java/com/dianping/cat/storage/dump/LocalMessageBucketManagerTest.java @@ -87,8 +87,6 @@ public class LocalMessageBucketManagerTest extends ComponentTestCase { Assert.assertNotNull("Message " + i + " not found.", tree); Assert.assertEquals(messageId, tree.getMessageId()); } - - manager.close(); } static class MockMessageIdFactory extends MessageIdFactory { diff --git a/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManager.java b/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManager.java index e4187a24f9a6cf8d342f068b0e58cc2a11721691..17cddc795a48880299889c830ed41f3cf0cdee59 100644 --- a/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManager.java +++ b/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManager.java @@ -3,15 +3,17 @@ package com.dianping.cat.hadoop.hdfs; import java.io.IOException; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; +import org.jboss.netty.util.internal.ConcurrentHashMap; import org.unidal.helper.Threads; import org.unidal.helper.Threads.Task; import org.unidal.lookup.ContainerHolder; @@ -40,24 +42,28 @@ public class HdfsMessageBucketManager extends ContainerHolder implements Message @Inject private ServerConfigManager m_serverConfigManager; - private Map m_buckets = new HashMap(); - - @Override - public void close() throws IOException { - for (HdfsMessageBucket bucket : m_buckets.values()) { - bucket.close(); - } - } + private Map m_buckets = new ConcurrentHashMap(); void closeIdleBuckets() throws IOException { long now = System.currentTimeMillis(); long hour = 3600 * 1000L; + Set closed = new HashSet(); - for (HdfsMessageBucket bucket : m_buckets.values()) { + for (Map.Entry entry : m_buckets.entrySet()) { + HdfsMessageBucket bucket = entry.getValue(); + if (now - bucket.getLastAccessTime() >= hour) { - bucket.close(); + try { + bucket.close(); + closed.add(entry.getKey()); + } catch (Exception e) { + Cat.logError(e); + } } } + for(String close:closed){ + m_buckets.remove(close); + } } @Override @@ -115,7 +121,6 @@ public class HdfsMessageBucketManager extends ContainerHolder implements Message bucket.initialize(dataFile); m_buckets.put(dataFile, bucket); } - if (bucket != null) { MessageTree tree = bucket.findById(messageId); @@ -164,7 +169,7 @@ public class HdfsMessageBucketManager extends ContainerHolder implements Message try { closeIdleBuckets(); } catch (IOException e) { - e.printStackTrace(); + Cat.logError(e); } } } catch (InterruptedException e) { diff --git a/cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManagerTest.java b/cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManagerTest.java index 102d1cb6158e8937998ee83edba72323da438904..0ec4630705c1686a6a3d50af62eee09be8b928d9 100644 --- a/cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManagerTest.java +++ b/cat-hadoop/src/test/java/com/dianping/cat/hadoop/hdfs/HdfsMessageBucketManagerTest.java @@ -78,8 +78,6 @@ public class HdfsMessageBucketManagerTest extends ComponentTestCase { Assert.assertNotNull("Message " + i + " not found.", tree); Assert.assertEquals(messageId, tree.getMessageId()); } - - hdfsManager.close(); } static class MockMessageIdFactory extends MessageIdFactory {