提交 d1fd207e 编写于 作者: S sean.wang

hdfs config fix

上级 1a917a49
...@@ -3,11 +3,14 @@ package com.dianping.cat.message.spi.internal; ...@@ -3,11 +3,14 @@ package com.dianping.cat.message.spi.internal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessageConsumer; import com.dianping.cat.message.spi.MessageConsumer;
import com.dianping.cat.message.spi.MessageConsumerRegistry; import com.dianping.cat.message.spi.MessageConsumerRegistry;
import com.site.lookup.annotation.Inject; import com.site.lookup.annotation.Inject;
public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry { public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry, Initializable {
@Inject @Inject
private List<MessageConsumer> m_consumers = new ArrayList<MessageConsumer>(); private List<MessageConsumer> m_consumers = new ArrayList<MessageConsumer>();
...@@ -20,4 +23,10 @@ public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry { ...@@ -20,4 +23,10 @@ public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry {
public void registerConsumer(MessageConsumer consumer) { public void registerConsumer(MessageConsumer consumer) {
m_consumers.add(consumer); m_consumers.add(consumer);
} }
@Override
public void initialize() throws InitializationException {
// a workaround to Plexus ComponentList bug
m_consumers = new ArrayList<MessageConsumer>(m_consumers);
}
} }
...@@ -51,6 +51,23 @@ ...@@ -51,6 +51,23 @@
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId>
<version>1.0.14</version>
<executions>
<execution>
<id>generate problem report model</id>
<phase>generate-sources</phase>
<goals>
<goal>dal-model</goal>
</goals>
<configuration>
<manifest>${basedir}/src/main/resources/META-INF/dal/model/configuration-manifest.xml</manifest>
</configuration>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<groupId>com.site.maven.plugins</groupId> <groupId>com.site.maven.plugins</groupId>
<artifactId>maven-codegen-plugin</artifactId> <artifactId>maven-codegen-plugin</artifactId>
......
...@@ -4,6 +4,7 @@ import java.util.ArrayList; ...@@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import com.dianping.cat.job.DumpToHdfsConsumer; import com.dianping.cat.job.DumpToHdfsConsumer;
import com.dianping.cat.job.configuration.HdfsConfig;
import com.dianping.cat.job.hdfs.DefaultInputChannel; import com.dianping.cat.job.hdfs.DefaultInputChannel;
import com.dianping.cat.job.hdfs.DefaultInputChannelManager; import com.dianping.cat.job.hdfs.DefaultInputChannelManager;
import com.dianping.cat.job.hdfs.DefaultOutputChannel; import com.dianping.cat.job.hdfs.DefaultOutputChannel;
...@@ -30,37 +31,40 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { ...@@ -30,37 +31,40 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator {
@Override @Override
public List<Component> defineComponents() { public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>(); List<Component> all = new ArrayList<Component>();
String serverUri = property("server-uri", "hdfs://192.168.7.43:9000/user/cat");
all.add(C(HdfsConfig.class));
all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) // all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text") // .req(MessageCodec.class, "plain-text") //
.config(E("maxSize").value(String.valueOf(128 * 1024 * 1024L)))); .config(E("maxSize").value(String.valueOf(128 * 1024 * 1024L))));
all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) // all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class) // .req(MessagePathBuilder.class) //
.config(E("baseDir").value("data"), // .req(HdfsConfig.class) //
E("serverUri").value(serverUri))); .config(E("type").value("data"))//
.config(E("baseDir").value("data")));
all.add(C(InputChannel.class, DefaultInputChannel.class).is(PER_LOOKUP) // all.add(C(InputChannel.class, DefaultInputChannel.class).is(PER_LOOKUP) //
.req(MessageCodec.class, "plain-text")); .req(MessageCodec.class, "plain-text"));
all.add(C(InputChannelManager.class, DefaultInputChannelManager.class) // all.add(C(InputChannelManager.class, DefaultInputChannelManager.class) //
.config(E("baseDir").value("data"), // .req(HdfsConfig.class) //
E("serverUri").value(serverUri))); .config(E("baseDir").value("data")));
all.add(C(OutputChannelManager.class, "dump", DefaultOutputChannelManager.class) // all.add(C(OutputChannelManager.class, "dump", DefaultOutputChannelManager.class) //
.req(MessagePathBuilder.class) // .req(MessagePathBuilder.class) //
.config(E("baseDir").value("dump"), // .config(E("type").value("dump")) //
E("serverUri").value(serverUri))); .config(E("baseDir").value("dump")));
all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) // all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) //
.req(OutputChannelManager.class, "dump").req(MessagePathBuilder.class)); .req(OutputChannelManager.class, "dump") //
.req(MessagePathBuilder.class));
all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) // all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) //
.req(MessageStorage.class, "hdfs")); .req(MessageStorage.class, "hdfs"));
all.add(C(Bucket.class, String.class.getName() + "-remote", RemoteStringBucket.class) // all.add(C(Bucket.class, String.class.getName() + "-remote", RemoteStringBucket.class) //
.is(PER_LOOKUP) // .is(PER_LOOKUP) //
.req(ReportDao.class)); .req(ReportDao.class));
all.add(C(Bucket.class, MessageTree.class.getName() + "-remote", RemoteMessageBucket.class) // all.add(C(Bucket.class, MessageTree.class.getName() + "-remote", RemoteMessageBucket.class) //
.is(PER_LOOKUP) // .is(PER_LOOKUP) //
.req(OutputChannelManager.class, InputChannelManager.class) // .req(OutputChannelManager.class, InputChannelManager.class) //
.req(LogviewDao.class, MessagePathBuilder.class)); .req(LogviewDao.class, MessagePathBuilder.class));
all.addAll(new DatabaseConfigurator().defineComponents()); all.addAll(new DatabaseConfigurator().defineComponents());
......
package com.dianping.cat.job.configuration;
import java.io.File;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.job.configuration.model.entity.Config;
import com.dianping.cat.job.configuration.model.transform.DefaultXmlParser;
import com.site.helper.Files;
public class HdfsConfig implements Initializable {
private String m_serverUrl;
private String m_dumpUrl;
public String getServerUrl() {
return m_serverUrl;
}
public String getDumpUrl() {
return m_dumpUrl;
}
@Override
public void initialize() throws InitializationException {
String path = "/data/appdatas/cat/hdfs.xml";
try {
String xml = Files.forIO().readFrom(new File(path), "utf-8");
Config config = new DefaultXmlParser().parse(xml);
m_serverUrl = config.getHdfses().get("data").getPath();
m_dumpUrl = config.getHdfses().get("dump").getPath();
} catch (Exception e) {
throw new InitializationException("Init hdfs file config error", e);
}
}
}
...@@ -10,6 +10,7 @@ import org.apache.hadoop.fs.Path; ...@@ -10,6 +10,7 @@ import org.apache.hadoop.fs.Path;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.job.configuration.HdfsConfig;
import com.site.lookup.ContainerHolder; import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject; import com.site.lookup.annotation.Inject;
...@@ -20,6 +21,9 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input ...@@ -20,6 +21,9 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
@Inject @Inject
private String m_baseDir = "target/hdfs"; private String m_baseDir = "target/hdfs";
@Inject
private HdfsConfig m_hdfsConfig;
private FileSystem m_fs; private FileSystem m_fs;
private Path m_basePath; private Path m_basePath;
...@@ -46,6 +50,7 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input ...@@ -46,6 +50,7 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
config.setInt("io.file.buffer.size", 8192); config.setInt("io.file.buffer.size", 8192);
this.setServerUri(this.m_hdfsConfig.getServerUrl());
if (m_serverUri == null) { if (m_serverUri == null) {
fs = FileSystem.getLocal(config); fs = FileSystem.getLocal(config);
m_basePath = new Path(fs.getWorkingDirectory(), m_baseDir); m_basePath = new Path(fs.getWorkingDirectory(), m_baseDir);
......
...@@ -17,22 +17,27 @@ import org.codehaus.plexus.logging.Logger; ...@@ -17,22 +17,27 @@ import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.job.configuration.HdfsConfig;
import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder; import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject; import com.site.lookup.annotation.Inject;
public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, LogEnabled {
LogEnabled {
@Inject @Inject
private MessagePathBuilder m_builder; private MessagePathBuilder m_builder;
@Inject @Inject
private String m_baseDir = "target/hdfs"; private String m_baseDir = "target/hdfs";
@Inject
private URI m_serverUri; private URI m_serverUri;
@Inject
private HdfsConfig m_hdfsConfig;
@Inject
private String m_type = "data";
private FileSystem m_fs; private FileSystem m_fs;
private Path m_basePath; private Path m_basePath;
...@@ -88,6 +93,11 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp ...@@ -88,6 +93,11 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp
Configuration config = new Configuration(); Configuration config = new Configuration();
FileSystem fs; FileSystem fs;
if ("data".equals(this.m_type)) {
this.setServerUri(this.m_hdfsConfig.getServerUrl());
} else if ("dump".equals(this.m_type)) {
this.setServerUri(this.m_hdfsConfig.getDumpUrl());
}
config.setInt("io.file.buffer.size", 8192); config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) { if (m_serverUri == null) {
fs = FileSystem.getLocal(config); fs = FileSystem.getLocal(config);
...@@ -148,4 +158,8 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp ...@@ -148,4 +158,8 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp
m_serverUri = URI.create(serverUri); m_serverUri = URI.create(serverUri);
} }
} }
public void setType(String type) {
this.m_type = type;
}
} }
...@@ -10,6 +10,8 @@ public interface OutputChannelManager { ...@@ -10,6 +10,8 @@ public interface OutputChannelManager {
public void closeAllChannels(); public void closeAllChannels();
public void closeChannel(OutputChannel channel); public void closeChannel(OutputChannel channel);
public void setServerUri(String serverUri);
public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException; public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException;
......
...@@ -98,14 +98,12 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled { ...@@ -98,14 +98,12 @@ public class RemoteMessageBucket implements Bucket<MessageTree>, LogEnabled {
} }
try { try {
Logview logview = m_logviewDao.findNextByMessageIdTags(id, direction, tagThread, tagSession, tagRequest, Logview logview = m_logviewDao.findNextByMessageIdTags(id, direction, tagThread, tagSession, tagRequest, LogviewEntity.READSET_FULL);
LogviewEntity.READSET_FULL);
MessageTree tree = readMessageTree(logview); MessageTree tree = readMessageTree(logview);
return tree; return tree;
} catch (DalException e) { } catch (DalException e) {
String message = String.format("Unable to find next message(%s) with tag(%s) and direction(%s)!", id, tagName, String message = String.format("Unable to find next message(%s) with tag(%s) and direction(%s)!", id, tagName, direction);
direction);
m_logger.error(message, e); m_logger.error(message, e);
return null; return null;
......
...@@ -47,7 +47,7 @@ ...@@ -47,7 +47,7 @@
<statement><![CDATA[ <statement><![CDATA[
SELECT <FIELDS/> SELECT <FIELDS/>
FROM <TABLE/> FROM <TABLE/>
WHERE <FIELD name='message-id'/> = ${message-id} </IN> WHERE <FIELD name='message-id'/> = ${message-id}
]]></statement> ]]></statement>
</query> </query>
</query-defs> </query-defs>
......
<?xml version="1.0" encoding="UTF-8"?>
<model>
<entity name="config" root="true">
<entity-ref name="hdfs" type="list" names="hdfses" />
</entity>
<entity name="hdfs">
<attribute name="id" value-type="String" />
<attribute name="path" value-type="String" />
</entity>
</model>
<?xml version="1.0" encoding="UTF-8"?>
<model model-package="com.dianping.cat.job.configuration.model" enable-merger="true" enable-json-builder="true"
enable-xml-parser="true" enable-base-visitor="true">
<entity name="config" root="true">
<entity-ref name="hdfs" type="map" names="hdfses" />
</entity>
<entity name="hdfs">
<attribute name="id" value-type="String" key="true" />
<attribute name="path" value-type="String" />
</entity>
</model>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<manifest>
<file path="configuration-codegen.xml" />
<file path="configuration-model.xml" />
</manifest>
<plexus> <plexus>
<components> <components>
<component>
<role>com.dianping.cat.job.configuration.HdfsConfig</role>
<implementation>com.dianping.cat.job.configuration.HdfsConfig</implementation>
</component>
<component> <component>
<role>com.dianping.cat.job.hdfs.OutputChannel</role> <role>com.dianping.cat.job.hdfs.OutputChannel</role>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannel</implementation> <implementation>com.dianping.cat.job.hdfs.DefaultOutputChannel</implementation>
...@@ -18,13 +22,16 @@ ...@@ -18,13 +22,16 @@
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role> <role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation> <implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation>
<configuration> <configuration>
<type>data</type>
<baseDir>data</baseDir> <baseDir>data</baseDir>
<serverUri>hdfs://192.168.7.43:9000/user/cat</serverUri>
</configuration> </configuration>
<requirements> <requirements>
<requirement> <requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role> <role>com.dianping.cat.message.spi.MessagePathBuilder</role>
</requirement> </requirement>
<requirement>
<role>com.dianping.cat.job.configuration.HdfsConfig</role>
</requirement>
</requirements> </requirements>
</component> </component>
<component> <component>
...@@ -43,16 +50,20 @@ ...@@ -43,16 +50,20 @@
<implementation>com.dianping.cat.job.hdfs.DefaultInputChannelManager</implementation> <implementation>com.dianping.cat.job.hdfs.DefaultInputChannelManager</implementation>
<configuration> <configuration>
<baseDir>data</baseDir> <baseDir>data</baseDir>
<serverUri>hdfs://192.168.7.43:9000/user/cat</serverUri>
</configuration> </configuration>
<requirements>
<requirement>
<role>com.dianping.cat.job.configuration.HdfsConfig</role>
</requirement>
</requirements>
</component> </component>
<component> <component>
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role> <role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
<role-hint>dump</role-hint> <role-hint>dump</role-hint>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation> <implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation>
<configuration> <configuration>
<type>dump</type>
<baseDir>dump</baseDir> <baseDir>dump</baseDir>
<serverUri>hdfs://192.168.7.43:9000/user/cat</serverUri>
</configuration> </configuration>
<requirements> <requirements>
<requirement> <requirement>
......
<config>
<hdfs id="data" path="hdfs://192.168.7.43:9000/user/cat"/>
<hdfs id="dump" path="hdfs://192.168.7.43:9000/user/cat"/>
</config>
<config>
<hdfs id="data" path="hdfs://192.168.7.43:9000/user/cat"/>
<hdfs id="dump" path="hdfs://192.168.7.43:9000/user/cat"/>
</config>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册