提交 47c4bcae 编写于 作者: F Frankie Wu

some weird issue

上级 2a6f5ddf
......@@ -46,22 +46,22 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
private static final long HOUR = 60 * MINUTE;
@Inject
private long m_duration = 1 * HOUR;
private AnalyzerFactory m_factory;
@Inject
private long m_extraTime = FIVE_MINUTES;
private LogviewUploader m_uploader;
@Inject
private int m_threads = 20;
private long m_duration = 1 * HOUR;
@Inject
private List<String> m_analyzerNames;
private long m_extraTime = FIVE_MINUTES;
@Inject
private AnalyzerFactory m_factory;
private int m_threads = 20;
@Inject
private LogviewUploader m_uploader;
private List<String> m_analyzerNames;
private ExecutorService m_executor;
......@@ -142,10 +142,13 @@ public class RealtimeConsumer extends ContainerHolder implements MessageConsumer
m_periodManager.setName("RealtimeConsumer-PeriodManager");
m_periodManager.start();
Thread uploadThread = new Thread(m_uploader);
uploadThread.setName("LogviewUploader");
uploadThread.start();
if (m_uploader != null) {
Thread uploadThread = new Thread(m_uploader);
uploadThread.setName("LogviewUploader");
uploadThread.start();
System.out.println("LogviewUploader started.");
}
}
public void setAnalyzers(String analyzers) {
......
......@@ -24,9 +24,7 @@ import com.dianping.cat.consumer.problem.handler.FailureHandler;
import com.dianping.cat.consumer.problem.handler.Handler;
import com.dianping.cat.consumer.problem.handler.LongUrlHandler;
import com.dianping.cat.consumer.transaction.TransactionAnalyzer;
import com.dianping.cat.hadoop.dal.LogviewDao;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.hdfs.FileSystemManager;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
......@@ -40,6 +38,7 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
List<Component> all = new ArrayList<Component>();
all.add(C(AnalyzerFactory.class, DefaultAnalyzerFactory.class));
all.add(C(LogviewUploader.class));
all.add(C(MessageConsumer.class, "realtime", RealtimeConsumer.class) //
.req(AnalyzerFactory.class, LogviewUploader.class) //
......@@ -78,11 +77,8 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(DumpChannelManager.class) //
.req(MessageCodec.class, "plain-text"));
all.add(C(LogviewUploader.class) //
.req(ServerConfigManager.class, FileSystemManager.class) //
.req(BucketManager.class, LogviewDao.class));
all.add(C(DumpUploader.class) //
.req(ServerConfigManager.class, FileSystemManager.class));
.req(ServerConfigManager.class));
return all;
}
......
......@@ -74,49 +74,6 @@ public class DumpUploader implements Initializable, LogEnabled {
}
}
public void upload() {
m_logger.info("Scanning ");
File baseDir = new File(m_baseDir, "outbox");
final List<String> paths = new ArrayList<String>();
Scanners.forDir().scan(baseDir, new FileMatcher() {
@Override
public Direction matches(File base, String path) {
if (new File(base, path).isFile()) {
paths.add(path);
}
return Direction.DOWN;
}
});
if (paths.size() > 0) {
for (String path : paths) {
File file = new File(baseDir, path);
try {
m_logger.info(String.format("Start uploading(%s) to HDFS(%s) ...", file.getCanonicalPath(), path));
FileInputStream fis = new FileInputStream(file);
FSDataOutputStream fdos = makeHdfsOutputStream(path);
Files.forIO().copy(fis, fdos, AutoClose.INPUT_OUTPUT);
if (!file.delete()) {
m_logger.warn("Can't delete file: " + file);
}
m_logger.info(String.format("Finish uploading(%s) to HDFS(%s).", file.getCanonicalPath(), path));
} catch (AccessControlException e) {
m_logger.error(String.format("No permission to create HDFS file(%s)!", path), e);
} catch (Exception e) {
m_logger.error(String.format("Uploading file(%s) to HDFS(%s) failed!", file, path), e);
}
}
}
}
class WriteJob extends Thread {
private volatile boolean m_active = true;
......@@ -141,5 +98,46 @@ public class DumpUploader implements Initializable, LogEnabled {
public void shutdown() {
m_active = false;
}
private void upload() {
File baseDir = new File(m_baseDir, "outbox");
final List<String> paths = new ArrayList<String>();
Scanners.forDir().scan(baseDir, new FileMatcher() {
@Override
public Direction matches(File base, String path) {
if (new File(base, path).isFile()) {
paths.add(path);
}
return Direction.DOWN;
}
});
if (paths.size() > 0) {
for (String path : paths) {
File file = new File(baseDir, path);
try {
m_logger.info(String.format("Start uploading(%s) to HDFS(%s) ...", file.getCanonicalPath(), path));
FileInputStream fis = new FileInputStream(file);
FSDataOutputStream fdos = makeHdfsOutputStream(path);
Files.forIO().copy(fis, fdos, AutoClose.INPUT_OUTPUT);
if (!file.delete()) {
m_logger.warn("Can't delete file: " + file);
}
m_logger.info(String.format("Finish uploading(%s) to HDFS(%s).", file.getCanonicalPath(), path));
} catch (AccessControlException e) {
m_logger.error(String.format("No permission to create HDFS file(%s)!", path), e);
} catch (Exception e) {
m_logger.error(String.format("Uploading file(%s) to HDFS(%s) failed!", file, path), e);
}
}
}
}
}
}
......@@ -32,9 +32,10 @@ import com.site.helper.Files;
import com.site.helper.Files.AutoClose;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class LogviewUploader implements Runnable, Initializable, LogEnabled {
public class LogviewUploader extends ContainerHolder implements Runnable, Initializable, LogEnabled {
@Inject
private ServerConfigManager m_configManager;
......@@ -64,6 +65,11 @@ public class LogviewUploader implements Runnable, Initializable, LogEnabled {
@Override
public void initialize() throws InitializationException {
m_configManager = lookup(ServerConfigManager.class);
m_fileSystemManager = lookup(FileSystemManager.class);
m_bucketManager = lookup(BucketManager.class);
m_logviewDao = lookup(LogviewDao.class);
ServerConfig serverConfig = m_configManager.getServerConfig();
if (serverConfig != null) {
......
......@@ -4,6 +4,10 @@
<role>com.dianping.cat.consumer.AnalyzerFactory</role>
<implementation>com.dianping.cat.consumer.DefaultAnalyzerFactory</implementation>
</component>
<component>
<role>com.dianping.cat.consumer.logview.LogviewUploader</role>
<implementation>com.dianping.cat.consumer.logview.LogviewUploader</implementation>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageConsumer</role>
<role-hint>realtime</role-hint>
......@@ -132,24 +136,6 @@
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.logview.LogviewUploader</role>
<implementation>com.dianping.cat.consumer.logview.LogviewUploader</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.dal.LogviewDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.dump.DumpUploader</role>
<implementation>com.dianping.cat.consumer.dump.DumpUploader</implementation>
......@@ -157,9 +143,6 @@
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
</requirement>
</requirements>
</component>
</components>
......
package com.dianping.cat.consumer;
import junit.framework.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class MyTest {
public static void main(String[] args) {
System.out.println("second");
System.out.println("second");
System.out.println("second");
System.out.println("second");
System.out.println("second");
// second comment
System.out.println("second");
System.out.println("second");
System.out.println("second");
System.out.println("second");
System.out.println("second");
System.out.println("second");
}
@Test
public void firstCase() {
Assert.assertEquals("First", "Fir" + "st");
}
@Test
@Ignore("need database connection")
public void secondCase() {
System.out.println("Second");
}
}
......@@ -16,15 +16,11 @@ import com.site.lookup.ComponentTestCase;
public class LogviewUploaderTest extends ComponentTestCase {
@Test
public void test() throws Exception {
BucketManager manager = lookup(BucketManager.class);
LogviewUploader uploader = lookup(LogviewUploader.class);
BucketManager manager = lookup(BucketManager.class);
long timestamp = 1334122638154L; // [04-11 13:37:18.154]
String domain = "test";
Thread thread = new Thread(uploader);
thread.start();
Bucket<MessageTree> bucket = manager.getLogviewBucket(timestamp, domain);
for (int i = 0; i < 1000; i++) {
......@@ -35,8 +31,6 @@ public class LogviewUploaderTest extends ComponentTestCase {
bucket.flush();
uploader.addBucket(timestamp, domain);
thread.join();
}
private DefaultMessageTree newMessageTree(int i, long timestamp) {
......
package com.dianping.cat.hadoop;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject;
public class DumpToHdfsConsumer implements MessageConsumer {
public static final String ID = "dump-to-hdfs";
@Inject
private MessageStorage m_storage;
@Inject
private String m_domain;
@Override
public void consume(MessageTree tree) {
if (m_domain == null || m_domain.equals(tree.getDomain())) {
m_storage.store(tree);
}
}
@Override
public String getConsumerId() {
return ID;
}
public void setDomain(String domain) {
m_domain = domain;
}
}
......@@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.hadoop.DumpToHdfsConsumer;
import com.dianping.cat.hadoop.dal.LogviewDao;
import com.dianping.cat.hadoop.dal.ReportDao;
import com.dianping.cat.hadoop.hdfs.DefaultInputChannel;
......@@ -20,7 +19,6 @@ import com.dianping.cat.hadoop.hdfs.OutputChannelManager;
import com.dianping.cat.hadoop.storage.RemoteMessageBucket;
import com.dianping.cat.hadoop.storage.RemoteStringBucket;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree;
......@@ -48,8 +46,6 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
.req(OutputChannelManager.class) //
.req(MessagePathBuilder.class));
all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) //
.req(MessageStorage.class, "hdfs"));
all.add(C(Bucket.class, String.class.getName() + "-remote", RemoteStringBucket.class) //
.is(PER_LOOKUP) //
......
......@@ -8,30 +8,30 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.security.SecurityUtil;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.configuration.ServerConfigManager;
import com.dianping.cat.configuration.server.entity.HdfsConfig;
import com.dianping.cat.configuration.server.entity.Property;
import com.dianping.cat.configuration.server.entity.ServerConfig;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class FileSystemManager extends ContainerHolder {
public class FileSystemManager implements Initializable {
@Inject
private ServerConfigManager m_manager;
private ServerConfigManager m_configManager;
@Inject
private String m_defaultBaseDir = "target/hdfs";
private String m_defaultBaseDir = "target/bucket/hdfs";
private Map<String, FileSystem> m_fileSystems = new HashMap<String, FileSystem>();
public ServerConfig getServerConfig() {
return m_manager.getServerConfig();
return m_configManager.getServerConfig();
}
public FileSystem getFileSystem(String id, StringBuilder baseDir) throws IOException {
ServerConfig config = m_manager.getServerConfig();
HdfsConfig hdfsConfig = config.getStorage().findHdfs(id);
ServerConfig config = m_configManager.getServerConfig();
HdfsConfig hdfsConfig = config == null ? null : config.getStorage().findHdfs(id);
FileSystem fs = m_fileSystems.get(id);
if (hdfsConfig == null) {
......@@ -73,26 +73,30 @@ public class FileSystemManager extends ContainerHolder {
// be an ip address
private Configuration getHdfsConfiguration() throws IOException {
Configuration config = new Configuration();
Map<String, Property> properties = m_manager.getServerConfig().getStorage().getProperties();
Property authentication = properties.get("hadoop.security.authentication");
ServerConfig serverConfig = m_configManager.getServerConfig();
config.setInt("io.file.buffer.size", 8192);
if (serverConfig != null) {
Map<String, Property> properties = serverConfig.getStorage().getProperties();
Property authentication = properties.get("hadoop.security.authentication");
for (Property property : properties.values()) {
config.set(property.getName(), property.getValue());
}
config.setInt("io.file.buffer.size", 8192);
for (Property property : properties.values()) {
config.set(property.getName(), property.getValue());
}
if (authentication != null && "kerberos".equals(authentication.getValue())) {
// For MAC OS X
// -Djava.security.krb5.realm=OX.AC.UK
// -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk
if (authentication != null && "kerberos".equals(authentication.getValue())) {
// For MAC OS X
// -Djava.security.krb5.realm=OX.AC.UK
// -Djava.security.krb5.kdc=kdc0.ox.ac.uk:kdc1.ox.ac.uk
System.setProperty("java.security.krb5.realm",
getValue(properties.get("java.security.krb5.realm"), "DIANPING.COM"));
System.setProperty("java.security.krb5.kdc",
getValue(properties.get("java.security.krb5.kdc"), "192.168.7.80"));
System.setProperty("java.security.krb5.realm",
getValue(properties.get("java.security.krb5.realm"), "DIANPING.COM"));
System.setProperty("java.security.krb5.kdc",
getValue(properties.get("java.security.krb5.kdc"), "192.168.7.80"));
SecurityUtil.login(config, "dfs.cat.keytab.file", "dfs.cat.kerberos.principal");
SecurityUtil.login(config, "dfs.cat.keytab.file", "dfs.cat.kerberos.principal");
}
}
return config;
......@@ -108,7 +112,12 @@ public class FileSystemManager extends ContainerHolder {
return defaultValue;
}
public void setDefaultBaseDir(String defaultBaseDir) {
m_defaultBaseDir = defaultBaseDir;
@Override
public void initialize() throws InitializationException {
ServerConfig serverConfig = m_configManager.getServerConfig();
if (serverConfig != null) {
m_defaultBaseDir = serverConfig.getStorage().getLocalBaseDir() + "/hdfs";
}
}
}
package com.dianping.cat.job.hdfs;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.hadoop.hdfs.FileSystemManager;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class FileSystemManagerTest extends ComponentTestCase {
@Test
public void test() throws Exception {
FileSystemManager manager = lookup(FileSystemManager.class);
StringBuilder baseDir = new StringBuilder();
System.out.println(manager.getFileSystem("test", baseDir));
System.out.println(baseDir);
}
}
......@@ -3,7 +3,6 @@ package com.dianping.cat.report.build;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.hadoop.DumpToHdfsConsumer;
import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.dianping.cat.message.spi.internal.DefaultMessageConsumerRegistry;
......@@ -18,15 +17,9 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
String defaultLocalMode = "true";
if ("true".equals(defaultLocalMode)) {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime" }, "m_consumers"));
} else {
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime", DumpToHdfsConsumer.ID }, "m_consumers"));
}
all.add(C(MessageConsumerRegistry.class, DefaultMessageConsumerRegistry.class) //
.req(MessageConsumer.class, new String[] { "realtime" }, "m_consumers"));
all.add(C(ValueTranslater.class, DefaultValueTranslater.class));
all.add(C(GraphBuilder.class, DefaultGraphBuilder.class) //
......
......@@ -14,7 +14,6 @@ import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.site.lookup.annotation.Inject;
import com.site.lookup.util.StringUtils;
import com.site.web.mvc.PageHandler;
import com.site.web.mvc.annotation.InboundActionMeta;
import com.site.web.mvc.annotation.OutboundActionMeta;
......
......@@ -282,6 +282,12 @@
<role>com.dianping.cat.consumer.RealtimeConsumer</role>
<implementation>com.dianping.cat.consumer.RealtimeConsumer</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.consumer.AnalyzerFactory</role>
</requirement>
<requirement>
<role>com.dianping.cat.consumer.logview.LogviewUploader</role>
</requirement>
<requirement>
<role>long</role>
</requirement>
......@@ -294,20 +300,48 @@
<requirement>
<role>java.util.List</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.logview.LogviewUploader</role>
<implementation>com.dianping.cat.consumer.logview.LogviewUploader</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.consumer.AnalyzerFactory</role>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.consumer.logview.LogviewUploader</role>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.storage.BucketManager</role>
</requirement>
<requirement>
<role>com.dianping.cat.hadoop.dal.LogviewDao</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.consumer.logview.LogviewUploader</role>
<implementation>com.dianping.cat.consumer.logview.LogviewUploader</implementation>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
<implementation>com.dianping.cat.configuration.ServerConfigManager</implementation>
</component>
<component>
<role>com.dianping.cat.hadoop.hdfs.FileSystemManager</role>
<implementation>com.dianping.cat.hadoop.hdfs.FileSystemManager</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.configuration.ServerConfigManager</role>
</requirement>
<requirement>
<role>java.lang.String</role>
</requirement>
</requirements>
</component>
<component>
<role>com.dianping.cat.hadoop.dal.LogviewDao</role>
<implementation>com.dianping.cat.hadoop.dal.LogviewDao</implementation>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
<role>com.site.dal.jdbc.QueryEngine</role>
</requirement>
</requirements>
</component>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册