提交 d1377018 编写于 作者: S sean.wang

bugfix

上级 7719a6f4
......@@ -16,6 +16,8 @@ public class DefaultInputChannel implements InputChannel {
private MessageCodec m_codec;
private FSDataInputStream m_in;
private String path;
@Override
public void close() {
......@@ -51,4 +53,13 @@ public class DefaultInputChannel implements InputChannel {
// TODO Auto-generated method stub
return false;
}
@Override
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
}
......@@ -2,10 +2,6 @@ package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
......@@ -19,8 +15,7 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationExce
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultInputChannelManager extends ContainerHolder implements InputChannelManager, Initializable,
LogEnabled {
public class DefaultInputChannelManager extends ContainerHolder implements InputChannelManager, Initializable, LogEnabled {
@Inject
private URI m_serverUri;
......@@ -29,38 +24,16 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
private FileSystem m_fs;
private Map<String, DefaultInputChannel> m_channels = new HashMap<String, DefaultInputChannel>();
private Logger m_logger;
private Path m_basePath;
@Override
public void cleanupChannels() {
try {
List<String> expired = new ArrayList<String>();
for (Map.Entry<String, DefaultInputChannel> e : m_channels.entrySet()) {
if (e.getValue().isExpired()) {
expired.add(e.getKey());
}
}
for (String path : expired) {
InputChannel channel = m_channels.remove(path);
closeChannel(channel);
}
} catch (Exception e) {
m_logger.warn("Error when doing cleanup!", e);
}
}
@Override
public void closeAllChannels() {
for (DefaultInputChannel channel : m_channels.values()) {
closeChannel(channel);
}
}
@Override
......@@ -81,7 +54,7 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
FileSystem fs;
config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) {
fs = FileSystem.getLocal(config);
m_basePath = new Path(fs.getWorkingDirectory(), m_baseDir);
......@@ -98,18 +71,11 @@ public class DefaultInputChannelManager extends ContainerHolder implements Input
@Override
public InputChannel openChannel(String path) throws IOException {
DefaultInputChannel channel = m_channels.get(path);
if (channel == null) {
Path file = new Path(m_basePath, path);
FSDataInputStream in = m_fs.open(file);
channel = (DefaultInputChannel) lookup(InputChannel.class);
channel.initialize(in);
m_channels.put(path, channel);
}
Path file = new Path(m_basePath, path);
FSDataInputStream in = m_fs.open(file);
DefaultInputChannel channel = (DefaultInputChannel) lookup(InputChannel.class);
channel.setPath(path);
channel.initialize(in);
return channel;
}
......
......@@ -26,4 +26,9 @@ public interface InputChannel {
* @throws IOException
*/
public MessageTree read(long offset, int length) throws IOException;
/**
* @return
*/
String getPath();
}
......@@ -5,7 +5,7 @@
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannel</implementation>
<instantiation-strategy>per-lookup</instantiation-strategy>
<configuration>
<maxSize>134217728</maxSize>
<maxSize>2097152</maxSize>
</configuration>
<requirements>
<requirement>
......@@ -17,10 +17,6 @@
<component>
<role>com.dianping.cat.job.hdfs.OutputChannelManager</role>
<implementation>com.dianping.cat.job.hdfs.DefaultOutputChannelManager</implementation>
<configuration>
<baseDir>data</baseDir>
<serverUri>hdfs://192.168.7.43:9000/user/cat/</serverUri>
</configuration>
<requirements>
<requirement>
<role>com.dianping.cat.message.spi.MessagePathBuilder</role>
......@@ -41,10 +37,6 @@
<component>
<role>com.dianping.cat.job.hdfs.InputChannelManager</role>
<implementation>com.dianping.cat.job.hdfs.DefaultInputChannelManager</implementation>
<configuration>
<baseDir>data</baseDir>
<serverUri>hdfs://192.168.7.43:9000/user/cat/</serverUri>
</configuration>
</component>
<component>
<role>com.dianping.cat.message.spi.MessageStorage</role>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册