diff --git a/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java b/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java index b668d804fa8c0e0ac288e306a72db273bd492a44..2b2c4909ec2e2c3f10012631c40f1671262dfea6 100644 --- a/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java +++ b/cat-client/src/main/java/com/dianping/cat/message/internal/DefaultMessageProducer.java @@ -36,7 +36,6 @@ public class DefaultMessageProducer implements MessageProducer { @Override public void logError(String message, Throwable cause) { - cause.printStackTrace(); if (Cat.getManager().isCatEnabled()) { if (shouldLog(cause)) { StringWriter writer = new StringWriter(2048); diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java index d9ec85ee43220618a664f89f2fe5cfeec433da8f..56d1e391a73bf84a700ac5b0d821036f4aa01531 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/build/ComponentsConfigurator.java @@ -48,7 +48,7 @@ import com.dianping.cat.core.config.ConfigDao; import com.dianping.cat.core.dal.HourlyReportContentDao; import com.dianping.cat.core.dal.HourlyReportDao; import com.dianping.cat.core.dal.ProjectDao; -import com.dianping.cat.hadoop.hdfs.LogviewUploader; +import com.dianping.cat.hadoop.hdfs.HdfsUploader; import com.dianping.cat.message.spi.core.MessageConsumer; import com.dianping.cat.message.spi.core.MessagePathBuilder; import com.dianping.cat.service.DefaultReportManager; @@ -130,7 +130,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(MessageBucketManager.class, LocalMessageBucketManager.ID, LocalMessageBucketManager.class) // .req(ServerConfigManager.class, MessagePathBuilder.class, ServerStatisticManager.class)// - .req(LogviewUploader.class)); + .req(HdfsUploader.class)); return all; } diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/LocalMessageBucketManager.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/LocalMessageBucketManager.java index e94c28f67e14faabb4658449cf51c66dd491351f..ab4e8bdae09064c79cd50af0c93b916440a25cee 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/LocalMessageBucketManager.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/LocalMessageBucketManager.java @@ -28,7 +28,7 @@ import com.dianping.cat.Cat; import com.dianping.cat.CatConstants; import com.dianping.cat.configuration.NetworkInterfaceManager; import com.dianping.cat.configuration.ServerConfigManager; -import com.dianping.cat.hadoop.hdfs.LogviewUploader; +import com.dianping.cat.hadoop.hdfs.HdfsUploader; import com.dianping.cat.message.Event; import com.dianping.cat.message.Message; import com.dianping.cat.message.MessageProducer; @@ -61,7 +61,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag private MessagePathBuilder m_pathBuilder; @Inject - private LogviewUploader m_logviewUploader; + private HdfsUploader m_logviewUploader; private String m_localIp = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); @@ -119,7 +119,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag m_baseDir = new File(m_configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR)); Threads.forGroup("cat").start(new BlockDumper(m_buckets, m_messageBlocks, m_serverStateManager)); - Threads.forGroup("cat").start(new OldMessageMover(this, m_buckets, m_logviewUploader, m_configManager)); + Threads.forGroup("cat").start(new LogviewUploader(this, m_buckets, m_logviewUploader, m_configManager)); if (m_configManager.isLocalMode()) { m_gzipThreads = 1; diff --git a/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/OldMessageMover.java b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/LogviewUploader.java similarity index 89% rename from cat-consumer/src/main/java/com/dianping/cat/consumer/dump/OldMessageMover.java rename to cat-consumer/src/main/java/com/dianping/cat/consumer/dump/LogviewUploader.java index 358102c28d63a87ab2574fc3c42ec2c15beb9c32..5c2af7ebd17fef1ba895dbb2cc94e1e661447436 100644 --- a/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/OldMessageMover.java +++ b/cat-consumer/src/main/java/com/dianping/cat/consumer/dump/LogviewUploader.java @@ -16,12 +16,12 @@ import org.unidal.helper.Threads.Task; import com.dianping.cat.Cat; import com.dianping.cat.configuration.NetworkInterfaceManager; import com.dianping.cat.configuration.ServerConfigManager; -import com.dianping.cat.hadoop.hdfs.LogviewUploader; +import com.dianping.cat.hadoop.hdfs.HdfsUploader; import com.dianping.cat.message.Message; import com.dianping.cat.message.Transaction; import com.dianping.cat.storage.message.LocalMessageBucket; -public class OldMessageMover implements Task { +public class LogviewUploader implements Task { private LocalMessageBucketManager m_bucketManager; @@ -29,14 +29,14 @@ public class OldMessageMover implements Task { private ConcurrentHashMap m_buckets; - private LogviewUploader m_logviewUploader; + private HdfsUploader m_logviewUploader; private ServerConfigManager m_configManager; private static final long ONE_HOUR = 60 * 60 * 1000L; - public OldMessageMover(LocalMessageBucketManager bucketManager, - ConcurrentHashMap buckets, LogviewUploader logviewUploader, + public LogviewUploader(LocalMessageBucketManager bucketManager, + ConcurrentHashMap buckets, HdfsUploader logviewUploader, ServerConfigManager configManager) { m_baseDir = new File(configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR)); m_bucketManager = bucketManager; @@ -56,7 +56,7 @@ public class OldMessageMover implements Task { private void deleteOldMessages() { final List paths = new ArrayList(); - final Set validPaths = queryValidPath(m_configManager.getLogViewMaxStroageTime()); + final Set validPaths = findValidPath(m_configManager.getLogViewMaxStroageTime()); Scanners.forDir().scan(m_baseDir, new FileMatcher() { @Override @@ -80,16 +80,29 @@ public class OldMessageMover implements Task { }); if (paths.size() > 0) { - processFiles(paths, false); + processLogviewFiles(paths, false); } } + private Set findValidPath(int storageDays) { + Set strs = new HashSet(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); + long currentTimeMillis = System.currentTimeMillis(); + + for (int i = 0; i < storageDays; i++) { + Date date = new Date(currentTimeMillis - i * 24 * 60 * 60 * 1000L); + + strs.add(sdf.format(date)); + } + return strs; + } + @Override public String getName() { return "LocalMessageBucketManager-OldMessageMover"; } - private void processFiles(final List paths, boolean upload) { + private void processLogviewFiles(final List paths, boolean upload) { String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); Transaction t = Cat.newTransaction("System", "Delete" + "-" + ip); @@ -130,19 +143,6 @@ public class OldMessageMover implements Task { t.complete(); } - private Set queryValidPath(int day) { - Set strs = new HashSet(); - SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); - long currentTimeMillis = System.currentTimeMillis(); - - for (int i = 0; i < day; i++) { - Date date = new Date(currentTimeMillis - i * 24 * 60 * 60 * 1000L); - - strs.add(sdf.format(date)); - } - return strs; - } - @Override public void run() { boolean active = true; @@ -223,7 +223,7 @@ public class OldMessageMover implements Task { }); if (paths.size() > 0) { - processFiles(paths, true); + processLogviewFiles(paths, true); } } diff --git a/cat-consumer/src/main/resources/META-INF/plexus/components.xml b/cat-consumer/src/main/resources/META-INF/plexus/components.xml index 8c73886bb8f0ff3b320cd213d18f3de8e6845363..29c8e3a7ff640bd5ad444632bfe74f4acd863858 100644 --- a/cat-consumer/src/main/resources/META-INF/plexus/components.xml +++ b/cat-consumer/src/main/resources/META-INF/plexus/components.xml @@ -328,7 +328,7 @@ com.dianping.cat.statistic.ServerStatisticManager - com.dianping.cat.hadoop.hdfs.LogviewUploader + com.dianping.cat.hadoop.hdfs.HdfsUploader diff --git a/cat-hadoop/src/main/java/com/dianping/cat/hadoop/build/ComponentsConfigurator.java b/cat-hadoop/src/main/java/com/dianping/cat/hadoop/build/ComponentsConfigurator.java index afe0fe1642b292ce9d0587ccdf5db563bde06067..632eaf2062712dc1789df63994aae3f9dea737f2 100644 --- a/cat-hadoop/src/main/java/com/dianping/cat/hadoop/build/ComponentsConfigurator.java +++ b/cat-hadoop/src/main/java/com/dianping/cat/hadoop/build/ComponentsConfigurator.java @@ -10,7 +10,7 @@ import com.dianping.cat.configuration.ServerConfigManager; import com.dianping.cat.hadoop.hdfs.FileSystemManager; import com.dianping.cat.hadoop.hdfs.HdfsMessageBucket; import com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManager; -import com.dianping.cat.hadoop.hdfs.LogviewUploader; +import com.dianping.cat.hadoop.hdfs.HdfsUploader; import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.codec.PlainTextMessageCodec; import com.dianping.cat.message.spi.core.MessagePathBuilder; @@ -29,7 +29,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { all.add(C(FileSystemManager.class) // .req(ServerConfigManager.class)); - all.add(C(LogviewUploader.class) // + all.add(C(HdfsUploader.class) // .req(FileSystemManager.class)); all.add(C(MessageBucket.class, HdfsMessageBucket.ID, HdfsMessageBucket.class) // diff --git a/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/LogviewUploader.java b/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsUploader.java similarity index 98% rename from cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/LogviewUploader.java rename to cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsUploader.java index 1d00f67fe8e1509fa7fafeb25d7e6ec0057fbe3d..ea6ae4972275dbe7ee710770f890330075a227e2 100644 --- a/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/LogviewUploader.java +++ b/cat-hadoop/src/main/java/com/dianping/cat/hadoop/hdfs/HdfsUploader.java @@ -22,7 +22,7 @@ import com.dianping.cat.configuration.ServerConfigManager; import com.dianping.cat.message.Message; import com.dianping.cat.message.Transaction; -public class LogviewUploader implements LogEnabled { +public class HdfsUploader implements LogEnabled { @Inject private FileSystemManager m_fileSystemManager; diff --git a/cat-hadoop/src/main/resources/META-INF/plexus/components.xml b/cat-hadoop/src/main/resources/META-INF/plexus/components.xml index 1e49bc9f0387d1fa57733f2b99ec127214678c4e..a18abeb597a5741e1c5e6ce0936318ce0765a375 100644 --- a/cat-hadoop/src/main/resources/META-INF/plexus/components.xml +++ b/cat-hadoop/src/main/resources/META-INF/plexus/components.xml @@ -10,8 +10,8 @@ - com.dianping.cat.hadoop.hdfs.LogviewUploader - com.dianping.cat.hadoop.hdfs.LogviewUploader + com.dianping.cat.hadoop.hdfs.HdfsUploader + com.dianping.cat.hadoop.hdfs.HdfsUploader com.dianping.cat.hadoop.hdfs.FileSystemManager