提交 a0f31552 编写于 作者: Y youyong205

refactor localmessage manager

...@@ -36,7 +36,6 @@ public class DefaultMessageProducer implements MessageProducer { ...@@ -36,7 +36,6 @@ public class DefaultMessageProducer implements MessageProducer {
@Override @Override
public void logError(String message, Throwable cause) { public void logError(String message, Throwable cause) {
cause.printStackTrace();
if (Cat.getManager().isCatEnabled()) { if (Cat.getManager().isCatEnabled()) {
if (shouldLog(cause)) { if (shouldLog(cause)) {
StringWriter writer = new StringWriter(2048); StringWriter writer = new StringWriter(2048);
......
...@@ -48,7 +48,7 @@ import com.dianping.cat.core.config.ConfigDao; ...@@ -48,7 +48,7 @@ import com.dianping.cat.core.config.ConfigDao;
import com.dianping.cat.core.dal.HourlyReportContentDao; import com.dianping.cat.core.dal.HourlyReportContentDao;
import com.dianping.cat.core.dal.HourlyReportDao; import com.dianping.cat.core.dal.HourlyReportDao;
import com.dianping.cat.core.dal.ProjectDao; import com.dianping.cat.core.dal.ProjectDao;
import com.dianping.cat.hadoop.hdfs.LogviewUploader; import com.dianping.cat.hadoop.hdfs.HdfsUploader;
import com.dianping.cat.message.spi.core.MessageConsumer; import com.dianping.cat.message.spi.core.MessageConsumer;
import com.dianping.cat.message.spi.core.MessagePathBuilder; import com.dianping.cat.message.spi.core.MessagePathBuilder;
import com.dianping.cat.service.DefaultReportManager; import com.dianping.cat.service.DefaultReportManager;
...@@ -130,7 +130,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -130,7 +130,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageBucketManager.class, LocalMessageBucketManager.ID, LocalMessageBucketManager.class) // all.add(C(MessageBucketManager.class, LocalMessageBucketManager.ID, LocalMessageBucketManager.class) //
.req(ServerConfigManager.class, MessagePathBuilder.class, ServerStatisticManager.class)// .req(ServerConfigManager.class, MessagePathBuilder.class, ServerStatisticManager.class)//
.req(LogviewUploader.class)); .req(HdfsUploader.class));
return all; return all;
} }
......
...@@ -28,7 +28,7 @@ import com.dianping.cat.Cat; ...@@ -28,7 +28,7 @@ import com.dianping.cat.Cat;
import com.dianping.cat.CatConstants; import com.dianping.cat.CatConstants;
import com.dianping.cat.configuration.NetworkInterfaceManager; import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.configuration.ServerConfigManager; import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.hadoop.hdfs.LogviewUploader; import com.dianping.cat.hadoop.hdfs.HdfsUploader;
import com.dianping.cat.message.Event; import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message; import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer; import com.dianping.cat.message.MessageProducer;
...@@ -61,7 +61,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag ...@@ -61,7 +61,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
private MessagePathBuilder m_pathBuilder; private MessagePathBuilder m_pathBuilder;
@Inject @Inject
private LogviewUploader m_logviewUploader; private HdfsUploader m_logviewUploader;
private String m_localIp = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); private String m_localIp = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
...@@ -119,7 +119,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag ...@@ -119,7 +119,7 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
m_baseDir = new File(m_configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR)); m_baseDir = new File(m_configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR));
Threads.forGroup("cat").start(new BlockDumper(m_buckets, m_messageBlocks, m_serverStateManager)); Threads.forGroup("cat").start(new BlockDumper(m_buckets, m_messageBlocks, m_serverStateManager));
Threads.forGroup("cat").start(new OldMessageMover(this, m_buckets, m_logviewUploader, m_configManager)); Threads.forGroup("cat").start(new LogviewUploader(this, m_buckets, m_logviewUploader, m_configManager));
if (m_configManager.isLocalMode()) { if (m_configManager.isLocalMode()) {
m_gzipThreads = 1; m_gzipThreads = 1;
......
...@@ -16,12 +16,12 @@ import org.unidal.helper.Threads.Task; ...@@ -16,12 +16,12 @@ import org.unidal.helper.Threads.Task;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager; import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.configuration.ServerConfigManager; import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.hadoop.hdfs.LogviewUploader; import com.dianping.cat.hadoop.hdfs.HdfsUploader;
import com.dianping.cat.message.Message; import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction; import com.dianping.cat.message.Transaction;
import com.dianping.cat.storage.message.LocalMessageBucket; import com.dianping.cat.storage.message.LocalMessageBucket;
public class OldMessageMover implements Task { public class LogviewUploader implements Task {
private LocalMessageBucketManager m_bucketManager; private LocalMessageBucketManager m_bucketManager;
...@@ -29,14 +29,14 @@ public class OldMessageMover implements Task { ...@@ -29,14 +29,14 @@ public class OldMessageMover implements Task {
private ConcurrentHashMap<String, LocalMessageBucket> m_buckets; private ConcurrentHashMap<String, LocalMessageBucket> m_buckets;
private LogviewUploader m_logviewUploader; private HdfsUploader m_logviewUploader;
private ServerConfigManager m_configManager; private ServerConfigManager m_configManager;
private static final long ONE_HOUR = 60 * 60 * 1000L; private static final long ONE_HOUR = 60 * 60 * 1000L;
public OldMessageMover(LocalMessageBucketManager bucketManager, public LogviewUploader(LocalMessageBucketManager bucketManager,
ConcurrentHashMap<String, LocalMessageBucket> buckets, LogviewUploader logviewUploader, ConcurrentHashMap<String, LocalMessageBucket> buckets, HdfsUploader logviewUploader,
ServerConfigManager configManager) { ServerConfigManager configManager) {
m_baseDir = new File(configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR)); m_baseDir = new File(configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR));
m_bucketManager = bucketManager; m_bucketManager = bucketManager;
...@@ -56,7 +56,7 @@ public class OldMessageMover implements Task { ...@@ -56,7 +56,7 @@ public class OldMessageMover implements Task {
private void deleteOldMessages() { private void deleteOldMessages() {
final List<String> paths = new ArrayList<String>(); final List<String> paths = new ArrayList<String>();
final Set<String> validPaths = queryValidPath(m_configManager.getLogViewMaxStroageTime()); final Set<String> validPaths = findValidPath(m_configManager.getLogViewMaxStroageTime());
Scanners.forDir().scan(m_baseDir, new FileMatcher() { Scanners.forDir().scan(m_baseDir, new FileMatcher() {
@Override @Override
...@@ -80,16 +80,29 @@ public class OldMessageMover implements Task { ...@@ -80,16 +80,29 @@ public class OldMessageMover implements Task {
}); });
if (paths.size() > 0) { if (paths.size() > 0) {
processFiles(paths, false); processLogviewFiles(paths, false);
} }
} }
private Set<String> findValidPath(int storageDays) {
Set<String> strs = new HashSet<String>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < storageDays; i++) {
Date date = new Date(currentTimeMillis - i * 24 * 60 * 60 * 1000L);
strs.add(sdf.format(date));
}
return strs;
}
@Override @Override
public String getName() { public String getName() {
return "LocalMessageBucketManager-OldMessageMover"; return "LocalMessageBucketManager-OldMessageMover";
} }
private void processFiles(final List<String> paths, boolean upload) { private void processLogviewFiles(final List<String> paths, boolean upload) {
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
Transaction t = Cat.newTransaction("System", "Delete" + "-" + ip); Transaction t = Cat.newTransaction("System", "Delete" + "-" + ip);
...@@ -130,19 +143,6 @@ public class OldMessageMover implements Task { ...@@ -130,19 +143,6 @@ public class OldMessageMover implements Task {
t.complete(); t.complete();
} }
private Set<String> queryValidPath(int day) {
Set<String> strs = new HashSet<String>();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < day; i++) {
Date date = new Date(currentTimeMillis - i * 24 * 60 * 60 * 1000L);
strs.add(sdf.format(date));
}
return strs;
}
@Override @Override
public void run() { public void run() {
boolean active = true; boolean active = true;
...@@ -223,7 +223,7 @@ public class OldMessageMover implements Task { ...@@ -223,7 +223,7 @@ public class OldMessageMover implements Task {
}); });
if (paths.size() > 0) { if (paths.size() > 0) {
processFiles(paths, true); processLogviewFiles(paths, true);
} }
} }
......
...@@ -328,7 +328,7 @@ ...@@ -328,7 +328,7 @@
<role>com.dianping.cat.statistic.ServerStatisticManager</role> <role>com.dianping.cat.statistic.ServerStatisticManager</role>
</requirement> </requirement>
<requirement> <requirement>
<role>com.dianping.cat.hadoop.hdfs.LogviewUploader</role> <role>com.dianping.cat.hadoop.hdfs.HdfsUploader</role>
</requirement> </requirement>
</requirements> </requirements>
</component> </component>
......
...@@ -10,7 +10,7 @@ import com.dianping.cat.configuration.ServerConfigManager; ...@@ -10,7 +10,7 @@ import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.hadoop.hdfs.FileSystemManager; import com.dianping.cat.hadoop.hdfs.FileSystemManager;
import com.dianping.cat.hadoop.hdfs.HdfsMessageBucket; import com.dianping.cat.hadoop.hdfs.HdfsMessageBucket;
import com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManager; import com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManager;
import com.dianping.cat.hadoop.hdfs.LogviewUploader; import com.dianping.cat.hadoop.hdfs.HdfsUploader;
import com.dianping.cat.message.spi.MessageCodec; import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec; import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.core.MessagePathBuilder; import com.dianping.cat.message.spi.core.MessagePathBuilder;
...@@ -29,7 +29,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -29,7 +29,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(FileSystemManager.class) // all.add(C(FileSystemManager.class) //
.req(ServerConfigManager.class)); .req(ServerConfigManager.class));
all.add(C(LogviewUploader.class) // all.add(C(HdfsUploader.class) //
.req(FileSystemManager.class)); .req(FileSystemManager.class));
all.add(C(MessageBucket.class, HdfsMessageBucket.ID, HdfsMessageBucket.class) // all.add(C(MessageBucket.class, HdfsMessageBucket.ID, HdfsMessageBucket.class) //
......
...@@ -22,7 +22,7 @@ import com.dianping.cat.configuration.ServerConfigManager; ...@@ -22,7 +22,7 @@ import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.message.Message; import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction; import com.dianping.cat.message.Transaction;
public class LogviewUploader implements LogEnabled { public class HdfsUploader implements LogEnabled {
@Inject @Inject
private FileSystemManager m_fileSystemManager; private FileSystemManager m_fileSystemManager;
......
...@@ -10,8 +10,8 @@ ...@@ -10,8 +10,8 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.hadoop.hdfs.LogviewUploader</role> <role>com.dianping.cat.hadoop.hdfs.HdfsUploader</role>
<implementation>com.dianping.cat.hadoop.hdfs.LogviewUploader</implementation> <implementation>com.dianping.cat.hadoop.hdfs.HdfsUploader</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role> <role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册