提交 f6eb6db7 编写于 作者: Y youyong205

refactor local message,fix 2hour load message bug

上级 abda83f4
......@@ -14,6 +14,10 @@
<groupId>com.dianping.cat</groupId>
<artifactId>cat-core</artifactId>
</dependency>
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.unidal.framework</groupId>
<artifactId>test-framework</artifactId>
......
......@@ -23,6 +23,7 @@ import com.dianping.cat.consumer.dependency.DatabaseParser;
import com.dianping.cat.consumer.dependency.DependencyAnalyzer;
import com.dianping.cat.consumer.dependency.DependencyDelegate;
import com.dianping.cat.consumer.dump.DumpAnalyzer;
import com.dianping.cat.consumer.dump.LocalMessageBucketManager;
import com.dianping.cat.consumer.event.EventAnalyzer;
import com.dianping.cat.consumer.event.EventDelegate;
import com.dianping.cat.consumer.heartbeat.HeartbeatAnalyzer;
......@@ -47,14 +48,15 @@ import com.dianping.cat.core.config.ConfigDao;
import com.dianping.cat.core.dal.HourlyReportContentDao;
import com.dianping.cat.core.dal.HourlyReportDao;
import com.dianping.cat.core.dal.ProjectDao;
import com.dianping.cat.hadoop.hdfs.LogviewUploader;
import com.dianping.cat.message.spi.core.MessageConsumer;
import com.dianping.cat.message.spi.core.MessagePathBuilder;
import com.dianping.cat.service.DefaultReportManager;
import com.dianping.cat.service.HostinfoService;
import com.dianping.cat.service.ProjectService;
import com.dianping.cat.service.ReportDelegate;
import com.dianping.cat.service.ReportManager;
import com.dianping.cat.statistic.ServerStatisticManager;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
import com.dianping.cat.storage.message.MessageBucketManager;
import com.dianping.cat.storage.report.ReportBucketManager;
import com.dianping.cat.task.TaskManager;
......@@ -93,6 +95,11 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageAnalyzer.class, DumpAnalyzer.ID, DumpAnalyzer.class).is(PER_LOOKUP) //
.req(ServerStatisticManager.class) //
.req(MessageBucketManager.class, LocalMessageBucketManager.ID));
all.add(C(MessageBucketManager.class, LocalMessageBucketManager.ID, LocalMessageBucketManager.class) //
.req(ServerConfigManager.class, MessagePathBuilder.class, ServerStatisticManager.class)//
.req(LogviewUploader.class));
return all;
}
......
......@@ -15,7 +15,6 @@ import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.statistic.ServerStatisticManager;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
import com.dianping.cat.storage.message.MessageBucketManager;
public class DumpAnalyzer extends AbstractMessageAnalyzer<Object> implements LogEnabled {
......
package com.dianping.cat.storage.message;
package com.dianping.cat.consumer.dump;
import java.io.File;
import java.io.IOException;
......@@ -28,6 +28,7 @@ import com.dianping.cat.Cat;
import com.dianping.cat.CatConstants;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.hadoop.hdfs.LogviewUploader;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
......@@ -37,6 +38,10 @@ import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.core.MessagePathBuilder;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.statistic.ServerStatisticManager;
import com.dianping.cat.storage.message.LocalMessageBucket;
import com.dianping.cat.storage.message.MessageBlock;
import com.dianping.cat.storage.message.MessageBucket;
import com.dianping.cat.storage.message.MessageBucketManager;
public class LocalMessageBucketManager extends ContainerHolder implements MessageBucketManager, Initializable,
LogEnabled {
......@@ -57,6 +62,9 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
@Inject
private MessagePathBuilder m_pathBuilder;
@Inject
private LogviewUploader m_logviewUploader;
private String m_localIp = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
private long m_error;
......@@ -242,7 +250,20 @@ public class LocalMessageBucketManager extends ContainerHolder implements Messag
}
}
private void moveFile(String path) throws IOException {
private void moveFile(String path) {
File file = new File(m_baseDir, path);
boolean success = m_logviewUploader.uploadLogviewFile(path, file);
if (success) {
File parent = file.getParentFile();
file.delete();
parent.delete(); // delete it if empty
parent.getParentFile().delete(); // delete it if empty
}
}
protected void moveFileOld(String path) throws IOException {
File outbox = new File(m_baseDir, "outbox");
File from = new File(m_baseDir, path);
File parent = from.getParentFile();
......
......@@ -293,6 +293,25 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.message.MessageBucketManager</role>
<role-hint>local</role-hint>
<implementation>com.dianping.cat.consumer.dump.LocalMessageBucketManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.core.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.statistic.ServerStatisticManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.LogviewUploader</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.service.ProjectService</role>
<implementation>com.dianping.cat.service.ProjectService</implementation>
......
......@@ -15,6 +15,7 @@ import com.dianping.cat.consumer.cross.CrossReportMergerTest;
import com.dianping.cat.consumer.dependency.DependencyAnalyzerTest;
import com.dianping.cat.consumer.dependency.DependencyReportMergerTest;
import com.dianping.cat.consumer.dump.DumpAnalyzerTest;
import com.dianping.cat.consumer.dump.LocalMessageBucketManagerTest;
import com.dianping.cat.consumer.event.EventAnalyzerTest;
import com.dianping.cat.consumer.event.EventReportMergerTest;
import com.dianping.cat.consumer.heartbeat.HeartbeatAnalyzerTest;
......
......@@ -13,7 +13,6 @@ import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.statistic.ServerStatisticManager;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
public class DumpAnalyzerTest {
......
package com.dianping.cat.storage.message;
package com.dianping.cat.consumer.dump;
import java.io.File;
import java.io.IOException;
......@@ -21,7 +21,6 @@ import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
import com.dianping.cat.storage.message.MessageBucketManager;
@RunWith(JUnit4.class)
......
......@@ -199,6 +199,10 @@ public class MetricAnalyzerTest extends ComponentTestCase {
return new MockStringBucket();
}
@Override
public void clearOldReports() {
}
}
public class MockBusinessReportDao extends BusinessReportDao {
......
......@@ -35,10 +35,6 @@ import com.dianping.cat.message.spi.core.TcpSocketReceiver;
import com.dianping.cat.service.HostinfoService;
import com.dianping.cat.service.IpService;
import com.dianping.cat.statistic.ServerStatisticManager;
import com.dianping.cat.storage.message.LocalMessageBucket;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
import com.dianping.cat.storage.message.MessageBucket;
import com.dianping.cat.storage.message.MessageBucketManager;
import com.dianping.cat.task.TaskManager;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
......@@ -81,12 +77,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(UrlPatternConfigManager.class).req(ConfigDao.class, UrlPatternHandler.class, ContentFetcher.class));
all.add(C(MessageBucket.class, LocalMessageBucket.ID, LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, PlainTextMessageCodec.ID));
all.add(C(MessageBucketManager.class, LocalMessageBucketManager.ID, LocalMessageBucketManager.class) //
.req(ServerConfigManager.class, MessagePathBuilder.class, ServerStatisticManager.class));
all.add(C(Module.class, CatCoreModule.ID, CatCoreModule.class));
all.addAll(new CatCoreDatabaseConfigurator().defineComponents());
......
......@@ -7,23 +7,31 @@ import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.core.MessagePathBuilder;
import com.dianping.cat.storage.report.ReportBucket;
import com.dianping.cat.storage.report.ReportBucketManager;
import com.dianping.cat.storage.message.LocalMessageBucket;
import com.dianping.cat.storage.message.MessageBucket;
import com.dianping.cat.storage.report.DefaultReportBucketManager;
import com.dianping.cat.storage.report.LocalReportBucket;
import com.dianping.cat.storage.report.ReportBucket;
import com.dianping.cat.storage.report.ReportBucketManager;
class StorageComponentConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(ReportBucketManager.class, DefaultReportBucketManager.class));
all.add(C(ReportBucketManager.class, DefaultReportBucketManager.class).req(ServerConfigManager.class));
all.add(C(ReportBucket.class, String.class.getName() + "-report", LocalReportBucket.class) //
.is(PER_LOOKUP) //
.req(ServerConfigManager.class, MessagePathBuilder.class));
all.add(C(MessageBucket.class, LocalMessageBucket.ID, LocalMessageBucket.class) //
.is(PER_LOOKUP) //
.req(MessageCodec.class, PlainTextMessageCodec.ID));
return all;
}
}
......@@ -319,7 +319,11 @@ public class ServerConfigManager implements Initializable, LogEnabled {
}
public boolean isHdfsOn() {
return !m_config.getStorage().isHdfsDisabled();
if (m_config != null) {
return !m_config.getStorage().isHdfsDisabled();
} else {
return false;
}
}
public boolean isInitialized() {
......
......@@ -100,7 +100,7 @@ public class LocalMessageBucket implements MessageBucket {
}
}
protected MessageBlock flushBlock() throws IOException {
public MessageBlock flushBlock() throws IOException {
if (m_dirty.get()) {
synchronized (this) {
m_out.close();
......@@ -132,6 +132,7 @@ public class LocalMessageBucket implements MessageBucket {
return m_writer;
}
@Override
public void initialize(String dataFile) throws IOException {
m_dataFile = dataFile;
......@@ -153,7 +154,7 @@ public class LocalMessageBucket implements MessageBucket {
m_codec = codec;
}
protected synchronized MessageBlock storeMessage(final ChannelBuffer buf, final MessageId id) throws IOException {
public synchronized MessageBlock storeMessage(final ChannelBuffer buf, final MessageId id) throws IOException {
int size = buf.readableBytes();
m_dirty.set(true);
......
......@@ -7,7 +7,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.zip.GZIPInputStream;
class MessageBlockReader {
public class MessageBlockReader {
private RandomAccessFile m_indexFile;
private RandomAccessFile m_dataFile;
......
......@@ -4,7 +4,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
class MessageBlockWriter {
public class MessageBlockWriter {
private RandomAccessFile m_indexFile;
private RandomAccessFile m_dataFile;
......
package com.dianping.cat.storage.report;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.helper.Scanners;
import org.unidal.helper.Scanners.FileMatcher;
import org.unidal.lookup.ContainerHolder;
import org.unidal.lookup.annotation.Inject;
public class DefaultReportBucketManager extends ContainerHolder implements ReportBucketManager {
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Transaction;
public class DefaultReportBucketManager extends ContainerHolder implements ReportBucketManager, Initializable {
@Inject
private ServerConfigManager m_configManager;
private String m_reportBaseDir;
@Override
public void clearOldReports() {
Transaction t = Cat.newTransaction("System", "DeleteReport");
try {
File reportDir = new File(m_reportBaseDir);
final List<String> toRemovePaths = new ArrayList<String>();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
final String today = sdf.format(date);
final String yesterday = sdf.format(new Date(date.getTime() - 24 * 60 * 60 * 1000L));
Scanners.forDir().scan(reportDir, new FileMatcher() {
@Override
public Direction matches(File base, String path) {
File file = new File(base, path);
if (file.isFile() && shouldDeleteReport(path)) {
toRemovePaths.add(path);
}
return Direction.DOWN;
}
private boolean shouldDeleteReport(String path) {
if (path.indexOf(today) > -1 || path.indexOf(yesterday) > -1) {
return false;
} else {
return true;
}
}
});
for (String path : toRemovePaths) {
File file = new File(m_reportBaseDir, path);
file.delete();
Cat.logEvent("System", "DeleteReport", Event.SUCCESS, file.getAbsolutePath());
}
removeEmptyDir(reportDir);
t.setStatus(Transaction.SUCCESS);
} catch (Exception e) {
Cat.logError(e);
t.setStatus(e);
} finally {
t.complete();
}
}
@Override
public void closeBucket(ReportBucket<?> bucket) {
......@@ -18,7 +82,8 @@ public class DefaultReportBucketManager extends ContainerHolder implements Repor
}
}
private ReportBucket<?> createBucket(Class<?> type, Date timestamp, String name, String namespace) throws IOException {
private ReportBucket<?> createBucket(Class<?> type, Date timestamp, String name, String namespace)
throws IOException {
ReportBucket<?> bucket = lookup(ReportBucket.class, type.getName() + "-" + namespace);
bucket.initialize(type, name, timestamp);
......@@ -26,7 +91,8 @@ public class DefaultReportBucketManager extends ContainerHolder implements Repor
}
@SuppressWarnings("unchecked")
private <T> ReportBucket<T> getBucket(Class<T> type, long timestamp, String name, String namespace) throws IOException {
private <T> ReportBucket<T> getBucket(Class<T> type, long timestamp, String name, String namespace)
throws IOException {
Date date = new Date(timestamp);
ReportBucket<?> bucket = createBucket(type, date, name, namespace);
......@@ -38,4 +104,35 @@ public class DefaultReportBucketManager extends ContainerHolder implements Repor
return getBucket(String.class, timestamp, name, "report");
}
@Override
public void initialize() throws InitializationException {
m_reportBaseDir = m_configManager.getHdfsLocalBaseDir("report");
}
private void removeEmptyDir(File baseFile) {
// the path has two depth
for (int i = 0; i < 2; i++) {
final List<String> directionPaths = new ArrayList<String>();
Scanners.forDir().scan(baseFile, new FileMatcher() {
@Override
public Direction matches(File base, String path) {
if (new File(base, path).isDirectory()) {
directionPaths.add(path);
}
return Direction.DOWN;
}
});
for (String path : directionPaths) {
try {
File file = new File(baseFile, path);
file.delete();
} catch (Exception e) {
}
}
}
}
}
......@@ -5,6 +5,8 @@ import java.io.IOException;
public interface ReportBucketManager {
public void closeBucket(ReportBucket<?> bucket);
public void clearOldReports();
public ReportBucket<String> getReportBucket(long timestamp, String name) throws IOException;
}
......@@ -139,34 +139,6 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.message.MessageBucket</role>
<role-hint>local</role-hint>
<implementation>com.dianping.cat.storage.message.LocalMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.message.MessageBucketManager</role>
<role-hint>local</role-hint>
<implementation>com.dianping.cat.storage.message.LocalMessageBucketManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.message.spi.core.MessagePathBuilder</role>
</requirement>
<requirement>
<role>com.dianping.cat.statistic.ServerStatisticManager</role>
</requirement>
</requirements>
</component>
<component>
<role>org.unidal.initialization.Module</role>
<role-hint>cat-core</role-hint>
......@@ -411,6 +383,11 @@
<component>
<role>com.dianping.cat.storage.report.ReportBucketManager</role>
<implementation>com.dianping.cat.storage.report.DefaultReportBucketManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.report.ReportBucket</role>
......@@ -426,5 +403,17 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.message.MessageBucket</role>
<role-hint>local</role-hint>
<implementation>com.dianping.cat.storage.message.LocalMessageBucket</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessageCodec</role>
<role-hint>plain-text</role-hint>
</requirement>
</requirements>
</component>
</components>
</plexus>
......@@ -127,6 +127,10 @@ public class DefaultReportManagerTest {
return new MockStringBucket();
}
@Override
public void clearOldReports() {
}
}
public class MockHourlyReportDao extends HourlyReportDao {
......
......@@ -7,6 +7,7 @@ import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.hadoop.hdfs.LogviewUploader;
import com.dianping.cat.hadoop.hdfs.UploaderAndCleaner;
import com.dianping.cat.hadoop.hdfs.FileSystemManager;
import com.dianping.cat.hadoop.hdfs.HdfsMessageBucket;
......@@ -32,6 +33,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(UploaderAndCleaner.class) //
.req(ServerConfigManager.class, FileSystemManager.class));
all.add(C(LogviewUploader.class) //
.req(FileSystemManager.class));
all.add(C(MessageBucket.class, HdfsMessageBucket.ID, HdfsMessageBucket.class) //
.is(PER_LOOKUP) //
.req(FileSystemManager.class) //
......
package com.dianping.cat.hadoop.hdfs;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Files;
import org.unidal.helper.Files.AutoClose;
import org.unidal.helper.Formats;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
public class LogviewUploader implements LogEnabled {
@Inject
private FileSystemManager m_fileSystemManager;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
private FSDataOutputStream makeHdfsOutputStream(String path) throws IOException {
StringBuilder baseDir = new StringBuilder(32);
FileSystem fs = m_fileSystemManager.getFileSystem("dump", baseDir);
Path file = new Path(baseDir.toString(), path);
FSDataOutputStream out;
try {
out = fs.create(file);
} catch (RemoteException re) {
fs.delete(file, false);
out = fs.create(file);
} catch (AlreadyBeingCreatedException e) {
fs.delete(file, false);
out = fs.create(file);
}
return out;
}
public boolean uploadLogviewFile(String path, File file) {
Transaction t = Cat.newTransaction("System", "UploadDump");
t.addData("file", path);
FSDataOutputStream fdos = null;
FileInputStream fis = null;
try {
fdos = makeHdfsOutputStream(path);
fis = new FileInputStream(file);
long start = System.currentTimeMillis();
Files.forIO().copy(fis, fdos, AutoClose.INPUT_OUTPUT);
double sec = (System.currentTimeMillis() - start) / 1000d;
String size = Formats.forNumber().format(file.length(), "0.#", "B");
String speed = sec <= 0 ? "N/A" : Formats.forNumber().format(file.length() / sec, "0.0", "B/s");
t.addData("size", size);
t.addData("speed", speed);
t.setStatus(Message.SUCCESS);
if (!file.delete()) {
m_logger.warn("Can't delete file: " + file);
}
return true;
} catch (AlreadyBeingCreatedException e) {
Cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("Already being created (%s)!", path), e);
} catch (AccessControlException e) {
Cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("No permission to create HDFS file(%s)!", path), e);
} catch (Exception e) {
Cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("Uploading file(%s) to HDFS(%s) failed!", file, path), e);
} finally {
try {
if (fdos != null) {
fdos.close();
}
} catch (IOException e) {
Cat.logError(e);
}
t.complete();
}
return false;
}
}
......@@ -33,7 +33,6 @@ import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Transaction;
public class UploaderAndCleaner implements Initializable, Task, LogEnabled {
......@@ -173,7 +172,7 @@ public class UploaderAndCleaner implements Initializable, Task, LogEnabled {
Calendar cal = Calendar.getInstance();
if (cal.get(Calendar.MINUTE) >= 10) {
uploadLogviewFile();
uploadLogviewFiles();
deleteOldReports();
}
}
......@@ -199,7 +198,7 @@ public class UploaderAndCleaner implements Initializable, Task, LogEnabled {
}
}
private void uploadLogviewFile() {
private void uploadLogviewFiles() {
File baseDir = new File(m_dumpBaseDir, "outbox");
final List<String> paths = new ArrayList<String>();
......@@ -219,9 +218,8 @@ public class UploaderAndCleaner implements Initializable, Task, LogEnabled {
if (len > 0) {
Cat.setup("DumpUploader");
MessageProducer cat = Cat.getProducer();
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
Transaction root = cat.newTransaction("System", "Dump-" + ip);
Transaction root = Cat.newTransaction("System", "Dump-" + ip);
Collections.sort(paths);
......@@ -230,55 +228,9 @@ public class UploaderAndCleaner implements Initializable, Task, LogEnabled {
for (int i = 0; i < len; i++) {
String path = paths.get(i);
Transaction t = cat.newTransaction("System", "UploadDump");
File file = new File(baseDir, path);
t.addData("file", path);
FSDataOutputStream fdos = null;
FileInputStream fis = null;
try {
fdos = makeHdfsOutputStream(path);
fis = new FileInputStream(file);
long start = System.currentTimeMillis();
Files.forIO().copy(fis, fdos, AutoClose.INPUT_OUTPUT);
double sec = (System.currentTimeMillis() - start) / 1000d;
String size = Formats.forNumber().format(file.length(), "0.#", "B");
String speed = sec <= 0 ? "N/A" : Formats.forNumber().format(file.length() / sec, "0.0", "B/s");
t.addData("size", size);
t.addData("speed", speed);
t.setStatus(Message.SUCCESS);
if (!file.delete()) {
m_logger.warn("Can't delete file: " + file);
}
} catch (AlreadyBeingCreatedException e) {
Cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("Already being created (%s)!", path), e);
} catch (AccessControlException e) {
cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("No permission to create HDFS file(%s)!", path), e);
} catch (Exception e) {
cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("Uploading file(%s) to HDFS(%s) failed!", file, path), e);
} finally {
try {
if (fdos != null) {
fdos.close();
}
} catch (IOException e) {
Cat.logError(e);
}
t.complete();
}
uploadLogviewFile( path, file);
try {
Thread.sleep(100);
......@@ -291,4 +243,54 @@ public class UploaderAndCleaner implements Initializable, Task, LogEnabled {
}
removeEmptyDir(baseDir);
}
private void uploadLogviewFile( String path, File file) {
Transaction t = Cat.newTransaction("System", "UploadDump");
t.addData("file", path);
FSDataOutputStream fdos = null;
FileInputStream fis = null;
try {
fdos = makeHdfsOutputStream(path);
fis = new FileInputStream(file);
long start = System.currentTimeMillis();
Files.forIO().copy(fis, fdos, AutoClose.INPUT_OUTPUT);
double sec = (System.currentTimeMillis() - start) / 1000d;
String size = Formats.forNumber().format(file.length(), "0.#", "B");
String speed = sec <= 0 ? "N/A" : Formats.forNumber().format(file.length() / sec, "0.0", "B/s");
t.addData("size", size);
t.addData("speed", speed);
t.setStatus(Message.SUCCESS);
if (!file.delete()) {
m_logger.warn("Can't delete file: " + file);
}
} catch (AlreadyBeingCreatedException e) {
Cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("Already being created (%s)!", path), e);
} catch (AccessControlException e) {
Cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("No permission to create HDFS file(%s)!", path), e);
} catch (Exception e) {
Cat.logError(e);
t.setStatus(e);
m_logger.error(String.format("Uploading file(%s) to HDFS(%s) failed!", file, path), e);
} finally {
try {
if (fdos != null) {
fdos.close();
}
} catch (IOException e) {
Cat.logError(e);
}
t.complete();
}
}
}
......@@ -21,6 +21,15 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.hadoop.hdfs.LogviewUploader</role>
<implementation>com.dianping.cat.hadoop.hdfs.LogviewUploader</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.storage.message.MessageBucket</role>
<role-hint>hdfs</role-hint>
......
......@@ -4,13 +4,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import com.dianping.cat.hadoop.hdfs.HdfsMessageBucketManagerTest;
@RunWith(Suite.class)
@SuiteClasses({
HdfsMessageBucketManagerTest.class
})
public class AllTests {
......
package com.dianping.cat.hadoop.hdfs;
import java.io.File;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.unidal.lookup.ComponentTestCase;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageId;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
import com.dianping.cat.storage.message.MessageBucketManager;
@RunWith(JUnit4.class)
public class HdfsMessageBucketManagerTest extends ComponentTestCase {
private MessageTree newMessageTree(String id, int i, long timestamp) {
MessageTree tree = new DefaultMessageTree();
tree.setDomain("target");
tree.setHostName("localhost");
tree.setIpAddress("127.0.0.1");
tree.setMessageId(id);
tree.setParentMessageId("parentMessageId" + i);
tree.setRootMessageId("rootMessageId" + i);
tree.setSessionToken("sessionToken");
tree.setThreadGroupName("threadGroupName");
tree.setThreadId("threadId" + i);
tree.setThreadName("threadName");
tree.setMessage(newTransaction("type", "name" + i, timestamp, "0", 123456 + i, "data" + i));
return tree;
}
private Transaction newTransaction(String type, String name, long timestamp, String status, int duration, String data) {
DefaultTransaction transaction = new DefaultTransaction(type, name, null);
transaction.setStatus(status);
transaction.addData(data);
transaction.complete();
transaction.setTimestamp(timestamp);
transaction.setDurationInMillis(duration);
return transaction;
}
@Test
@Ignore
public void testReadWrite() throws Exception {
LocalMessageBucketManager localManager = (LocalMessageBucketManager) lookup(MessageBucketManager.class,
LocalMessageBucketManager.ID);
MessageBucketManager hdfsManager = lookup(MessageBucketManager.class, HdfsMessageBucketManager.ID);
MessageIdFactory factory = new MockMessageIdFactory();
long now = 1343532130488L;
int num = 100;
factory.setIpAddress("7f000001");
factory.initialize("source");
localManager.setBaseDir(new File("target/bucket/hdfs/dump")); // make local and hdfs base dir same
for (int i = 0; i < num; i++) {
MessageTree tree = newMessageTree(factory.getNextId(), i, now + i * 10L);
MessageId id = MessageId.parse(tree.getMessageId());
localManager.storeMessage(tree,id);
}
for (int i = 0; i < num; i++) {
String messageId = "source-7f000001-373203-" + i;
MessageTree tree = hdfsManager.loadMessage(messageId);
Assert.assertNotNull("Message " + i + " not found.", tree);
Assert.assertEquals(messageId, tree.getMessageId());
}
}
static class MockMessageIdFactory extends MessageIdFactory {
@Override
protected long getTimestamp() {
return 1343532130488L / 3600 / 1000;
}
}
}
......@@ -9,6 +9,7 @@ import org.unidal.lookup.configuration.Component;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.consumer.cross.CrossAnalyzer;
import com.dianping.cat.consumer.dependency.DependencyAnalyzer;
import com.dianping.cat.consumer.dump.LocalMessageBucketManager;
import com.dianping.cat.consumer.event.EventAnalyzer;
import com.dianping.cat.consumer.heartbeat.HeartbeatAnalyzer;
import com.dianping.cat.consumer.matrix.MatrixAnalyzer;
......@@ -56,7 +57,6 @@ import com.dianping.cat.report.page.model.transaction.HistoricalTransactionServi
import com.dianping.cat.report.page.model.transaction.LocalTransactionService;
import com.dianping.cat.report.service.ReportServiceManager;
import com.dianping.cat.report.task.cached.CachedReportTask;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
import com.dianping.cat.storage.message.MessageBucketManager;
import com.dianping.cat.storage.report.ReportBucketManager;
......
......@@ -16,8 +16,8 @@ import com.dianping.cat.report.page.model.spi.internal.BaseLocalModelService;
import com.dianping.cat.service.ModelPeriod;
import com.dianping.cat.service.ModelRequest;
import com.dianping.cat.service.ModelResponse;
import com.dianping.cat.storage.message.LocalMessageBucketManager;
import com.dianping.cat.storage.message.MessageBucketManager;
import com.dianping.cat.consumer.dump.LocalMessageBucketManager;
public class LocalMessageService extends BaseLocalModelService<String> {
@Inject(LocalMessageBucketManager.ID)
......
......@@ -42,15 +42,23 @@ public abstract class TaskConsumer implements org.unidal.helper.Threads.Task {
protected abstract boolean processTask(Task doing);
public boolean checkTime() {
Calendar cal = Calendar.getInstance();
int minute = cal.get(Calendar.MINUTE);
if (minute > 15) {
return true;
} else {
return false;
}
}
@Override
public void run() {
String localIp = getLoaclIp();
while (running) {
try {
Calendar cal = Calendar.getInstance();
int minute = cal.get(Calendar.MINUTE);
if (minute > 15) {
if (checkTime()) {
Task task = findDoingTask(localIp);
if (task == null) {
task = findTodoTask();
......
......@@ -15,6 +15,11 @@ public class TaskConsumerTest {
public static class TaskConsumerWrap extends TaskConsumer {
final List<Integer> replayer = new ArrayList<Integer>();
@Override
public boolean checkTime() {
return true;
}
@Override
protected Task findDoingTask(String ip) {
......@@ -97,6 +102,12 @@ public class TaskConsumerTest {
taskList.add(t);
TaskConsumerWrap consumer = new TaskConsumerWrap() {
@Override
public boolean checkTime() {
return true;
}
@Override
protected Task findDoingTask(String ip) {
super.findDoingTask(ip);
......@@ -145,6 +156,12 @@ public class TaskConsumerTest {
taskList.add(t);
TaskConsumerWrap consumer = new TaskConsumerWrap() {
@Override
public boolean checkTime() {
return true;
}
@Override
protected Task findDoingTask(String ip) {
super.findDoingTask(ip);
......@@ -176,6 +193,11 @@ public class TaskConsumerTest {
taskList.add(t);
final TaskConsumerWrap consumer = new TaskConsumerWrap() {
@Override
public boolean checkTime() {
return true;
}
@Override
protected boolean updateTodoToDoing(Task todo) {
......@@ -227,6 +249,11 @@ public class TaskConsumerTest {
taskList.add(t);
final TaskConsumerWrap consumer = new TaskConsumerWrap() {
@Override
public boolean checkTime() {
return true;
}
@Override
protected boolean updateTodoToDoing(Task todo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册