From d1fd207efd7f9d2eeb69e0d16a862dca546b4ed9 Mon Sep 17 00:00:00 2001 From: "sean.wang" Date: Wed, 28 Mar 2012 18:42:21 +0800 Subject: [PATCH] hdfs config fix --- .../DefaultMessageConsumerRegistry.java | 11 ++++- cat-job/pom.xml | 17 ++++++++ .../cat/job/build/ComponentsConfigurator.java | 42 ++++++++++--------- .../cat/job/configuration/HdfsConfig.java | 37 ++++++++++++++++ .../job/hdfs/DefaultInputChannelManager.java | 5 +++ .../job/hdfs/DefaultOutputChannelManager.java | 20 +++++++-- .../cat/job/hdfs/OutputChannelManager.java | 2 + .../cat/job/storage/RemoteMessageBucket.java | 6 +-- .../META-INF/dal/jdbc/dal-report.xml | 2 +- .../dal/model/configuration-codegen.xml | 11 +++++ .../dal/model/configuration-model.xml | 11 +++++ .../resources/META-INF/dal/model/manifest.xml | 6 +++ .../resources/META-INF/plexus/components.xml | 17 ++++++-- .../dianping/cat/job/configuration/config.xml | 4 ++ .../cat/job/configuration/model/config.xml | 4 ++ 15 files changed, 164 insertions(+), 31 deletions(-) create mode 100644 cat-job/src/main/java/com/dianping/cat/job/configuration/HdfsConfig.java create mode 100644 cat-job/src/main/resources/META-INF/dal/model/configuration-codegen.xml create mode 100644 cat-job/src/main/resources/META-INF/dal/model/configuration-model.xml create mode 100644 cat-job/src/main/resources/META-INF/dal/model/manifest.xml create mode 100644 cat-job/src/test/resources/com/dianping/cat/job/configuration/config.xml create mode 100644 cat-job/src/test/resources/com/dianping/cat/job/configuration/model/config.xml diff --git a/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageConsumerRegistry.java b/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageConsumerRegistry.java index ad3eb3142..a7fa420c2 100644 --- a/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageConsumerRegistry.java +++ b/cat-core/src/main/java/com/dianping/cat/message/spi/internal/DefaultMessageConsumerRegistry.java @@ -3,11 +3,14 @@ package com.dianping.cat.message.spi.internal; import java.util.ArrayList; import java.util.List; +import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; +import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; + import com.dianping.cat.message.spi.MessageConsumer; import com.dianping.cat.message.spi.MessageConsumerRegistry; import com.site.lookup.annotation.Inject; -public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry { +public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry, Initializable { @Inject private List m_consumers = new ArrayList(); @@ -20,4 +23,10 @@ public class DefaultMessageConsumerRegistry implements MessageConsumerRegistry { public void registerConsumer(MessageConsumer consumer) { m_consumers.add(consumer); } + + @Override + public void initialize() throws InitializationException { + // a workaround to Plexus ComponentList bug + m_consumers = new ArrayList(m_consumers); + } } diff --git a/cat-job/pom.xml b/cat-job/pom.xml index 5bbbd66a2..df9427095 100644 --- a/cat-job/pom.xml +++ b/cat-job/pom.xml @@ -51,6 +51,23 @@ + + com.site.maven.plugins + maven-codegen-plugin + 1.0.14 + + + generate problem report model + generate-sources + + dal-model + + + ${basedir}/src/main/resources/META-INF/dal/model/configuration-manifest.xml + + + + com.site.maven.plugins maven-codegen-plugin diff --git a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java index 3e10ad848..7f9a7d3fc 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java +++ b/cat-job/src/main/java/com/dianping/cat/job/build/ComponentsConfigurator.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import com.dianping.cat.job.DumpToHdfsConsumer; +import com.dianping.cat.job.configuration.HdfsConfig; import com.dianping.cat.job.hdfs.DefaultInputChannel; import com.dianping.cat.job.hdfs.DefaultInputChannelManager; import com.dianping.cat.job.hdfs.DefaultOutputChannel; @@ -30,37 +31,40 @@ public class ComponentsConfigurator extends AbstractResourceConfigurator { @Override public List defineComponents() { List all = new ArrayList(); - String serverUri = property("server-uri", "hdfs://192.168.7.43:9000/user/cat"); + + all.add(C(HdfsConfig.class)); all.add(C(OutputChannel.class, DefaultOutputChannel.class).is(PER_LOOKUP) // - .req(MessageCodec.class, "plain-text") // - .config(E("maxSize").value(String.valueOf(128 * 1024 * 1024L)))); + .req(MessageCodec.class, "plain-text") // + .config(E("maxSize").value(String.valueOf(128 * 1024 * 1024L)))); all.add(C(OutputChannelManager.class, DefaultOutputChannelManager.class) // - .req(MessagePathBuilder.class) // - .config(E("baseDir").value("data"), // - E("serverUri").value(serverUri))); + .req(MessagePathBuilder.class) // + .req(HdfsConfig.class) // + .config(E("type").value("data"))// + .config(E("baseDir").value("data"))); all.add(C(InputChannel.class, DefaultInputChannel.class).is(PER_LOOKUP) // - .req(MessageCodec.class, "plain-text")); + .req(MessageCodec.class, "plain-text")); all.add(C(InputChannelManager.class, DefaultInputChannelManager.class) // - .config(E("baseDir").value("data"), // - E("serverUri").value(serverUri))); + .req(HdfsConfig.class) // + .config(E("baseDir").value("data"))); all.add(C(OutputChannelManager.class, "dump", DefaultOutputChannelManager.class) // - .req(MessagePathBuilder.class) // - .config(E("baseDir").value("dump"), // - E("serverUri").value(serverUri))); + .req(MessagePathBuilder.class) // + .config(E("type").value("dump")) // + .config(E("baseDir").value("dump"))); all.add(C(MessageStorage.class, "hdfs", HdfsMessageStorage.class) // - .req(OutputChannelManager.class, "dump").req(MessagePathBuilder.class)); + .req(OutputChannelManager.class, "dump") // + .req(MessagePathBuilder.class)); all.add(C(MessageConsumer.class, DumpToHdfsConsumer.ID, DumpToHdfsConsumer.class) // - .req(MessageStorage.class, "hdfs")); + .req(MessageStorage.class, "hdfs")); all.add(C(Bucket.class, String.class.getName() + "-remote", RemoteStringBucket.class) // - .is(PER_LOOKUP) // - .req(ReportDao.class)); + .is(PER_LOOKUP) // + .req(ReportDao.class)); all.add(C(Bucket.class, MessageTree.class.getName() + "-remote", RemoteMessageBucket.class) // - .is(PER_LOOKUP) // - .req(OutputChannelManager.class, InputChannelManager.class) // - .req(LogviewDao.class, MessagePathBuilder.class)); + .is(PER_LOOKUP) // + .req(OutputChannelManager.class, InputChannelManager.class) // + .req(LogviewDao.class, MessagePathBuilder.class)); all.addAll(new DatabaseConfigurator().defineComponents()); diff --git a/cat-job/src/main/java/com/dianping/cat/job/configuration/HdfsConfig.java b/cat-job/src/main/java/com/dianping/cat/job/configuration/HdfsConfig.java new file mode 100644 index 000000000..62758a60c --- /dev/null +++ b/cat-job/src/main/java/com/dianping/cat/job/configuration/HdfsConfig.java @@ -0,0 +1,37 @@ +package com.dianping.cat.job.configuration; + +import java.io.File; + +import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; +import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; + +import com.dianping.cat.job.configuration.model.entity.Config; +import com.dianping.cat.job.configuration.model.transform.DefaultXmlParser; +import com.site.helper.Files; + +public class HdfsConfig implements Initializable { + + private String m_serverUrl; + private String m_dumpUrl; + + public String getServerUrl() { + return m_serverUrl; + } + + public String getDumpUrl() { + return m_dumpUrl; + } + + @Override + public void initialize() throws InitializationException { + String path = "/data/appdatas/cat/hdfs.xml"; + try { + String xml = Files.forIO().readFrom(new File(path), "utf-8"); + Config config = new DefaultXmlParser().parse(xml); + m_serverUrl = config.getHdfses().get("data").getPath(); + m_dumpUrl = config.getHdfses().get("dump").getPath(); + } catch (Exception e) { + throw new InitializationException("Init hdfs file config error", e); + } + } +} diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java index 07ed4fe1b..ed4e152a2 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultInputChannelManager.java @@ -10,6 +10,7 @@ import org.apache.hadoop.fs.Path; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; +import com.dianping.cat.job.configuration.HdfsConfig; import com.site.lookup.ContainerHolder; import com.site.lookup.annotation.Inject; @@ -20,6 +21,9 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input @Inject private String m_baseDir = "target/hdfs"; + @Inject + private HdfsConfig m_hdfsConfig; + private FileSystem m_fs; private Path m_basePath; @@ -46,6 +50,7 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input config.setInt("io.file.buffer.size", 8192); + this.setServerUri(this.m_hdfsConfig.getServerUrl()); if (m_serverUri == null) { fs = FileSystem.getLocal(config); m_basePath = new Path(fs.getWorkingDirectory(), m_baseDir); diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java index 7aff32f7c..af7382e40 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/DefaultOutputChannelManager.java @@ -17,22 +17,27 @@ import org.codehaus.plexus.logging.Logger; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; +import com.dianping.cat.job.configuration.HdfsConfig; import com.dianping.cat.message.spi.MessagePathBuilder; import com.dianping.cat.message.spi.MessageTree; import com.site.lookup.ContainerHolder; import com.site.lookup.annotation.Inject; -public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, - LogEnabled { +public class DefaultOutputChannelManager extends ContainerHolder implements OutputChannelManager, Initializable, LogEnabled { @Inject private MessagePathBuilder m_builder; @Inject private String m_baseDir = "target/hdfs"; - @Inject private URI m_serverUri; + @Inject + private HdfsConfig m_hdfsConfig; + + @Inject + private String m_type = "data"; + private FileSystem m_fs; private Path m_basePath; @@ -88,6 +93,11 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp Configuration config = new Configuration(); FileSystem fs; + if ("data".equals(this.m_type)) { + this.setServerUri(this.m_hdfsConfig.getServerUrl()); + } else if ("dump".equals(this.m_type)) { + this.setServerUri(this.m_hdfsConfig.getDumpUrl()); + } config.setInt("io.file.buffer.size", 8192); if (m_serverUri == null) { fs = FileSystem.getLocal(config); @@ -148,4 +158,8 @@ public class DefaultOutputChannelManager extends ContainerHolder implements Outp m_serverUri = URI.create(serverUri); } } + + public void setType(String type) { + this.m_type = type; + } } diff --git a/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannelManager.java b/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannelManager.java index 93b4ae37d..d593d340a 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannelManager.java +++ b/cat-job/src/main/java/com/dianping/cat/job/hdfs/OutputChannelManager.java @@ -10,6 +10,8 @@ public interface OutputChannelManager { public void closeAllChannels(); public void closeChannel(OutputChannel channel); + + public void setServerUri(String serverUri); public OutputChannel openChannel(MessageTree tree, boolean forceNew) throws IOException; diff --git a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java index b034c348a..e7cf93a16 100644 --- a/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java +++ b/cat-job/src/main/java/com/dianping/cat/job/storage/RemoteMessageBucket.java @@ -98,14 +98,12 @@ public class RemoteMessageBucket implements Bucket, LogEnabled { } try { - Logview logview = m_logviewDao.findNextByMessageIdTags(id, direction, tagThread, tagSession, tagRequest, - LogviewEntity.READSET_FULL); + Logview logview = m_logviewDao.findNextByMessageIdTags(id, direction, tagThread, tagSession, tagRequest, LogviewEntity.READSET_FULL); MessageTree tree = readMessageTree(logview); return tree; } catch (DalException e) { - String message = String.format("Unable to find next message(%s) with tag(%s) and direction(%s)!", id, tagName, - direction); + String message = String.format("Unable to find next message(%s) with tag(%s) and direction(%s)!", id, tagName, direction); m_logger.error(message, e); return null; diff --git a/cat-job/src/main/resources/META-INF/dal/jdbc/dal-report.xml b/cat-job/src/main/resources/META-INF/dal/jdbc/dal-report.xml index 2924052bd..3efbfb680 100644 --- a/cat-job/src/main/resources/META-INF/dal/jdbc/dal-report.xml +++ b/cat-job/src/main/resources/META-INF/dal/jdbc/dal-report.xml @@ -47,7 +47,7 @@ FROM - WHERE = ${message-id} + WHERE = ${message-id} ]]> diff --git a/cat-job/src/main/resources/META-INF/dal/model/configuration-codegen.xml b/cat-job/src/main/resources/META-INF/dal/model/configuration-codegen.xml new file mode 100644 index 000000000..d4f18bdb0 --- /dev/null +++ b/cat-job/src/main/resources/META-INF/dal/model/configuration-codegen.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/cat-job/src/main/resources/META-INF/dal/model/configuration-model.xml b/cat-job/src/main/resources/META-INF/dal/model/configuration-model.xml new file mode 100644 index 000000000..767fd5ea7 --- /dev/null +++ b/cat-job/src/main/resources/META-INF/dal/model/configuration-model.xml @@ -0,0 +1,11 @@ + + + + + + + + + + \ No newline at end of file diff --git a/cat-job/src/main/resources/META-INF/dal/model/manifest.xml b/cat-job/src/main/resources/META-INF/dal/model/manifest.xml new file mode 100644 index 000000000..9d98dc414 --- /dev/null +++ b/cat-job/src/main/resources/META-INF/dal/model/manifest.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/cat-job/src/main/resources/META-INF/plexus/components.xml b/cat-job/src/main/resources/META-INF/plexus/components.xml index 6c1d437cc..9da075168 100644 --- a/cat-job/src/main/resources/META-INF/plexus/components.xml +++ b/cat-job/src/main/resources/META-INF/plexus/components.xml @@ -1,5 +1,9 @@ + + com.dianping.cat.job.configuration.HdfsConfig + com.dianping.cat.job.configuration.HdfsConfig + com.dianping.cat.job.hdfs.OutputChannel com.dianping.cat.job.hdfs.DefaultOutputChannel @@ -18,13 +22,16 @@ com.dianping.cat.job.hdfs.OutputChannelManager com.dianping.cat.job.hdfs.DefaultOutputChannelManager + data data - hdfs://192.168.7.43:9000/user/cat com.dianping.cat.message.spi.MessagePathBuilder + + com.dianping.cat.job.configuration.HdfsConfig + @@ -43,16 +50,20 @@ com.dianping.cat.job.hdfs.DefaultInputChannelManager data - hdfs://192.168.7.43:9000/user/cat + + + com.dianping.cat.job.configuration.HdfsConfig + + com.dianping.cat.job.hdfs.OutputChannelManager dump com.dianping.cat.job.hdfs.DefaultOutputChannelManager + dump dump - hdfs://192.168.7.43:9000/user/cat diff --git a/cat-job/src/test/resources/com/dianping/cat/job/configuration/config.xml b/cat-job/src/test/resources/com/dianping/cat/job/configuration/config.xml new file mode 100644 index 000000000..919c5e87a --- /dev/null +++ b/cat-job/src/test/resources/com/dianping/cat/job/configuration/config.xml @@ -0,0 +1,4 @@ + + + + diff --git a/cat-job/src/test/resources/com/dianping/cat/job/configuration/model/config.xml b/cat-job/src/test/resources/com/dianping/cat/job/configuration/model/config.xml new file mode 100644 index 000000000..919c5e87a --- /dev/null +++ b/cat-job/src/test/resources/com/dianping/cat/job/configuration/model/config.xml @@ -0,0 +1,4 @@ + + + + -- GitLab