提交 9d465fe0 编写于 作者: S sean.wang

dev...

上级 53c758a1
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.InputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.site.lookup.annotation.Inject;
public class DefaultInputChannel implements InputChannel {
@Inject
private MessageCodec m_codec;
private InputStream m_in;
@Override
public void close() {
if (m_in != null) {
try {
m_in.close();
m_in = null;
} catch (IOException e) {
// ignore it
}
}
}
@Override
public void initialize(InputStream in) {
m_in = in;
}
@Override
public MessageTree read(int index, int length) throws IOException {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(8192);
buf.writeBytes(m_in, length);
MessageTree tree = new DefaultMessageTree();
m_codec.decode(buf, tree);
return tree;
}
@Override
public boolean isExpired() {
// TODO Auto-generated method stub
return false;
}
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.ContainerHolder;
import com.site.lookup.annotation.Inject;
public class DefaultInputChannelManager extends ContainerHolder implements InputChannelManager, Initializable, LogEnabled {
@Inject
private MessagePathBuilder m_builder;
@Inject
private String m_baseDir = "target/hdfs";
@Inject
private URI m_serverUri;
private FileSystem m_fs;
private Path m_basePath;
private String m_ipAddress;
private Map<String, InputChannel> m_channels = new HashMap<String, InputChannel>();
private Logger m_logger;
@Override
public void cleanupChannels() {
try {
List<String> expired = new ArrayList<String>();
for (Map.Entry<String, InputChannel> e : m_channels.entrySet()) {
if (e.getValue().isExpired()) {
expired.add(e.getKey());
}
}
for (String path : expired) {
InputChannel channel = m_channels.remove(path);
closeChannel(channel);
}
} catch (Exception e) {
m_logger.warn("Error when doing cleanup!", e);
}
}
@Override
public void closeAllChannels() {
for (InputChannel channel : m_channels.values()) {
closeChannel(channel);
}
}
@Override
public void closeChannel(InputChannel channel) {
channel.close();
super.release(channel);
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public void initialize() throws InitializationException {
try {
Configuration config = new Configuration();
FileSystem fs;
config.setInt("io.file.buffer.size", 8192);
if (m_serverUri == null) {
fs = FileSystem.getLocal(config);
} else {
fs = FileSystem.get(m_serverUri, config); // TODO Not tested yet
}
m_fs = fs;
m_basePath = new Path(m_fs.getWorkingDirectory(), m_baseDir);
} catch (Exception e) {
throw new InitializationException("Error when getting HDFS file system.", e);
}
try {
InetAddress localHost = InetAddress.getLocalHost();
m_ipAddress = localHost.getHostAddress();
} catch (UnknownHostException e) {
m_logger.warn("Unable to get local host!", e);
}
}
public void setBaseDir(String baseDir) {
m_baseDir = baseDir;
}
public void setServerUri(String serverUri) {
m_serverUri = URI.create(serverUri);
}
@Override
public InputChannel openChannel(String messageId) throws IOException {
String path = m_builder.getHdfsPath(tree, m_ipAddress);
InputChannel channel = m_channels.get(path);
if (channel == null) {
Path file = new Path(m_basePath, path + "-0");
FSDataInputStream in = m_fs.open(file);
channel = lookup(InputChannel.class);
channel.initialize(in);
m_channels.put(path, channel);
}
return channel;
}
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
import java.io.InputStream;
import com.dianping.cat.message.spi.MessageTree;
public interface InputChannel {
/**
* Close the channel.
*/
public void close();
/**
* Initialize the channel with an output stream.
*
* @param out
*/
public void initialize(InputStream in);
/**
* Check if the channel is expired.
*
* @return true if the channel is expired, false otherwise.
*/
public boolean isExpired();
/**
* Fetch message tree from hdfs.
*
* @param index
* @param length
* @return
* @throws IOException
*/
public MessageTree read(int index, int length) throws IOException;
}
package com.dianping.cat.job.hdfs;
import java.io.IOException;
public interface InputChannelManager {
public void cleanupChannels();
public void closeAllChannels();
public void closeChannel(InputChannel channel);
public InputChannel openChannel(String messageId) throws IOException;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册