提交 2c4d5340 编写于 作者: S sean.wang

refactor

上级 847076a9
......@@ -13,7 +13,7 @@ import org.apache.hadoop.fs.FileSystem;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.hdfs.HdfsHelper;
import com.dianping.cat.storage.hdfs.HdfsImpl;
import com.dianping.cat.storage.HdfsImpl;
/**
* @author sean.wang
......
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
import java.io.IOException;
......
/**
*
*/
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
import java.io.File;
import java.io.IOException;
......@@ -16,6 +16,9 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.FileSystem;
import com.dianping.cat.storage.hdfs.HdfsDataStore;
import com.dianping.cat.storage.hdfs.HdfsIndexStore;
/**
* @author sean.wang
* @since Mar 7, 2012
......@@ -42,14 +45,14 @@ public class HdfsImpl implements Tkv {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#close()
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#close()
*/
@Override
public void close() throws IOException {
try {
writeLock.lock();
this.getIndexStore().close();
this.getDataStore().close();
this.indexStore.close();
this.dataStore.close();
} finally {
writeLock.unlock();
}
......@@ -58,11 +61,11 @@ public class HdfsImpl implements Tkv {
/*
*
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#get(int)
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#get(int)
*/
@Override
public byte[] get(int indexPos) throws IOException {
Meta meta = this.getIndex(indexPos);
Meta meta = this.indexStore.getIndex(indexPos);
if (meta == null) {
return null;
}
......@@ -72,7 +75,7 @@ public class HdfsImpl implements Tkv {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#get(java.lang.String)
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#get(java.lang.String)
*/
@Override
public byte[] get(String key) throws IOException {
......@@ -86,7 +89,7 @@ public class HdfsImpl implements Tkv {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#get(java.lang.String, java.lang.String)
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#get(java.lang.String, java.lang.String)
*/
@Override
public byte[] get(String key, String tag) throws IOException {
......@@ -104,31 +107,31 @@ public class HdfsImpl implements Tkv {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#getIndex(int)
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#getIndex(int)
*/
@Override
public Meta getIndex(int indexPos) throws IOException {
return this.getIndexStore().getIndex(indexPos);
return this.indexStore.getIndex(indexPos);
}
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#getIndex(java.lang.String)
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#getIndex(java.lang.String)
*/
@Override
public Meta getIndex(String key) throws IOException {
return this.getIndexStore().getIndex(key);
return this.indexStore.getIndex(key);
}
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#getIndex(java.lang.String, java.lang.String)
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#getIndex(java.lang.String, java.lang.String)
*/
@Override
public Meta getIndex(String key, String tag) throws IOException {
return this.getIndexStore().getIndex(key, tag);
return this.indexStore.getIndex(key, tag);
}
public IndexStore getIndexStore() {
......@@ -138,7 +141,7 @@ public class HdfsImpl implements Tkv {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#getRecord(java.lang.String, java.lang.String)
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#getRecord(java.lang.String, java.lang.String)
*/
@Override
public Record getRecord(String key, String tag) throws IOException {
......@@ -151,7 +154,7 @@ public class HdfsImpl implements Tkv {
* @throws IOException
*/
private byte[] getValue(Meta meta) throws IOException {
return getDataStore().get(meta.getOffset(), meta.getLength());
return this.dataStore.get(meta.getOffset(), meta.getLength());
}
private List<Meta> metas = new ArrayList<Meta>();
......@@ -185,7 +188,7 @@ public class HdfsImpl implements Tkv {
}
}
for (Meta meta : metas) {
this.getIndexStore().append(meta);
this.indexStore.append(meta);
}
this.metas.clear();
} finally {
......@@ -196,7 +199,7 @@ public class HdfsImpl implements Tkv {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#put(java.lang.String, byte[])
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#put(java.lang.String, byte[])
*/
@Override
public boolean put(String key, byte[] value) throws IOException {
......@@ -209,11 +212,11 @@ public class HdfsImpl implements Tkv {
public boolean put(String key, byte[] value, String... tagNames) throws IOException {
try {
this.writeLock.lock();
if (this.getIndexStore().getIndex(key) != null) {
if (this.indexStore.getIndex(key) != null) {
return false; // this key already exists
}
long offset = this.getDataStore().length();
this.getDataStore().append(value);
long offset = this.dataStore.length();
this.dataStore.append(value);
Meta meta = new Meta();
meta.setKey(key);
meta.setOffset(offset);
......@@ -233,11 +236,11 @@ public class HdfsImpl implements Tkv {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.hdfs.Tkv#size()
* @see com.dianping.com.dianping.cat.storage.hdfs.Tkv#size()
*/
@Override
public long size() throws IOException {
return this.getIndexStore().size();
return this.indexStore.size();
}
public void startWrite() throws IOException {
......@@ -271,13 +274,13 @@ public class HdfsImpl implements Tkv {
boolean indexDeleted = this.indexStore.delete();
return dataDeleted && indexDeleted;
}
public boolean deleteLocal() throws IOException {
boolean dataDeleted = this.dataStore.deleteLocal();
boolean indexDeleted = this.indexStore.deleteLocal();
return dataDeleted && indexDeleted;
}
public boolean deleteRemote() throws IOException {
boolean dataDeleted = this.dataStore.deleteRemote();
boolean indexDeleted = this.indexStore.deleteRemote();
......
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.Comparator;
......@@ -9,7 +9,7 @@ public interface IndexStore {
void close() throws IOException;
Meta getIndex(int indexPos) throws IOException;
Meta getIndex(long indexPos) throws IOException;
Meta getIndex(String key) throws IOException;
......
/**
*
*/
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
import static com.dianping.cat.storage.hdfs.util.NumberKit.bytes2Int;
import static com.dianping.cat.storage.hdfs.util.NumberKit.int2Bytes;
import static com.dianping.cat.storage.util.NumberKit.bytes2Int;
import static com.dianping.cat.storage.util.NumberKit.int2Bytes;
import java.io.File;
import java.io.IOException;
......@@ -16,7 +16,8 @@ import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.dianping.cat.storage.hdfs.util.StringKit;
import com.dianping.cat.storage.local.RAFDataStore;
import com.dianping.cat.storage.util.StringKit;
/**
......
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
import java.util.HashMap;
import java.util.Map;
......
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
public class Record {
......
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
public class Tag implements Comparable<Tag> {
public class Tag {
private int previous = -1;
private int pos = 0;
private int next = -1;
private String name;
@Override
public int compareTo(Tag o) {
return this.name.compareTo(o.name);
}
public String getName() {
return name;
}
......
/**
*
*/
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage;
import java.io.IOException;
......
......@@ -10,6 +10,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.dianping.cat.storage.DataStore;
/**
* @author sean.wang
* @since Mar 7, 2012
......@@ -37,34 +39,38 @@ public class HdfsDataStore implements DataStore {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.DataStore#append(byte)
* @see com.dianping.com.dianping.cat.storage.DataStore#append(byte)
*/
@Override
public void append(byte b) throws IOException {
this.output.write(b);
this.length++;
synchronized (this.output) {
this.output.write(b);
this.length++;
}
}
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.DataStore#append(byte[])
* @see com.dianping.com.dianping.cat.storage.DataStore#append(byte[])
*/
@Override
public void append(byte[] bytes) throws IOException {
this.output.write(bytes);
this.length += bytes.length;
synchronized (this.output) {
this.output.write(bytes);
this.length += bytes.length;
}
}
@Override
public void append(long offset, byte[] bytes) throws IOException {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Hdfs unsupport random write!");
}
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.DataStore#close()
* @see com.dianping.com.dianping.cat.storage.DataStore#close()
*/
@Override
public void close() throws IOException {
......@@ -119,7 +125,7 @@ public class HdfsDataStore implements DataStore {
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.DataStore#get(long, int)
* @see com.dianping.com.dianping.cat.storage.DataStore#get(long, int)
*/
@Override
public byte[] get(long offset, int length) throws IOException {
......@@ -128,15 +134,20 @@ public class HdfsDataStore implements DataStore {
throw new IllegalStateException("input can't null");
}
byte[] bytes = new byte[length];
in.seek(offset);
in.read(bytes);
synchronized (in) {
in.seek(offset);
int actual = in.read(bytes);
if (actual != length) {
throw new IOException(String.format("readed bytes expect %s actual %s", length, actual));
}
}
return bytes;
}
/*
* (non-Javadoc)
*
* @see com.dianping.com.dianping.cat.storage.hdfs.DataStore#length()
* @see com.dianping.com.dianping.cat.storage.DataStore#length()
*/
@Override
public long length() throws IOException {
......
......@@ -12,7 +12,10 @@ import java.util.Comparator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.dianping.cat.storage.hdfs.util.IoKit;
import com.dianping.cat.storage.IndexStore;
import com.dianping.cat.storage.Meta;
import com.dianping.cat.storage.local.RAFIndexStore;
import com.dianping.cat.storage.util.IoKit;
......@@ -93,7 +96,7 @@ public class HdfsIndexStore implements IndexStore {
* @see com.dianping.cat.storage.hdfs.hdfs.IndexStore#getIndex(int)
*/
@Override
public Meta getIndex(int indexPos) throws IOException {
public Meta getIndex(long indexPos) throws IOException {
return this.localIndexStore.getIndex(indexPos);
}
......
/**
*
*/
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage.local;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import com.dianping.cat.storage.DataStore;
/**
* @author sean.wang
......@@ -50,10 +52,13 @@ public class RAFDataStore implements DataStore {
}
@Override
public byte[] get(long pos, int size) throws IOException {
byte[] bytes = new byte[size];
readRAF.seek(pos);
readRAF.read(bytes);
public byte[] get(long offset, int length) throws IOException {
byte[] bytes = new byte[length];
readRAF.seek(offset);
int actual = readRAF.read(bytes);
if(actual != length) {
throw new IOException(String.format("readed bytes expect %s actual %s", length, actual));
}
return bytes;
}
......
/**
*
*/
package com.dianping.cat.storage.hdfs;
package com.dianping.cat.storage.local;
import java.io.File;
import java.io.FileInputStream;
......@@ -17,8 +17,11 @@ import java.util.Map;
import org.apache.commons.lang.ArrayUtils;
import com.dianping.cat.storage.hdfs.util.ArrayKit;
import com.dianping.cat.storage.hdfs.util.NumberKit;
import com.dianping.cat.storage.IndexStore;
import com.dianping.cat.storage.Meta;
import com.dianping.cat.storage.Tag;
import com.dianping.cat.storage.util.ArrayKit;
import com.dianping.cat.storage.util.NumberKit;
/**
* @author sean.wang
......@@ -39,6 +42,8 @@ public class RAFIndexStore implements IndexStore {
private File storeFile;
private long length;
private final static int OFFSET_LEN = 4;
private final static int LENGTH_LEN = 4;
......@@ -57,13 +62,14 @@ public class RAFIndexStore implements IndexStore {
return fixed;
}
public RAFIndexStore(File storeFile, int keyLength, int tagLength) throws FileNotFoundException {
public RAFIndexStore(File storeFile, int keyLength, int tagLength) throws IOException {
this.keyLength = keyLength;
this.tagLength = tagLength;
this.indexLength = this.keyLength + OFFSET_LEN + LENGTH_LEN + tagLength;
this.storeFile = storeFile;
writeRAF = new RandomAccessFile(storeFile, "rw");
readRAF = new RandomAccessFile(storeFile, "r");
this.length = this.readRAF.length();
}
@Override
......@@ -99,19 +105,20 @@ public class RAFIndexStore implements IndexStore {
buf[i] = TAG_SPLITER;
}
synchronized (this.writeRAF) {
this.writeRAF.seek(writeRAF.length());
this.writeRAF.seek(this.length);
this.writeRAF.write(buf);
this.length += buf.length;
}
}
private int binarySearchPos(String key, Comparator<byte[]> keyComp) throws IOException {
int low = 0;
private long binarySearchPos(String key, Comparator<byte[]> keyComp) throws IOException {
long low = 0;
int indexLength = this.indexLength;
int keyLength = this.keyLength;
int high = (int) this.readRAF.length() / indexLength;
long high = (int) this.length / indexLength - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
long mid = (low + high) >>> 1;
byte[] midVal = this.getBytes(mid * indexLength, keyLength);
int cmp = keyComp.compare(midVal, toFixedKey(key));
......@@ -140,10 +147,13 @@ public class RAFIndexStore implements IndexStore {
}
}
private byte[] getBytes(long pos, int size) throws IOException {
byte[] bytes = new byte[size];
readRAF.seek(pos);
readRAF.read(bytes);
private byte[] getBytes(long offset, int length) throws IOException {
byte[] bytes = new byte[length];
readRAF.seek(offset);
int actual = readRAF.read(bytes);
if (actual != length) {
throw new IOException(String.format("readed bytes expect %s actual %s", length, actual));
}
return bytes;
}
......@@ -153,10 +163,8 @@ public class RAFIndexStore implements IndexStore {
* @see com.dianping.cat.storage.hdfs.hdfs.IndexStore#getIndex(int)
*/
@Override
public Meta getIndex(int indexPos) throws IOException {
this.readRAF.seek(indexPos * this.indexLength);
byte[] bytes = new byte[this.indexLength];
this.readRAF.read(bytes);
public Meta getIndex(long indexPos) throws IOException {
byte[] bytes = this.getBytes(1L * indexPos * this.indexLength, this.indexLength);
return deserialMeta(bytes);
}
......@@ -224,8 +232,11 @@ public class RAFIndexStore implements IndexStore {
*/
@Override
public Meta getIndex(String key, Comparator<byte[]> keyComp) throws IOException {
if (this.size() == 0) {
return null;
}
synchronized (this.readRAF) {
int pos = this.binarySearchPos(key, keyComp);
long pos = this.binarySearchPos(key, keyComp);
if (pos < 0) {
return null;
}
......@@ -258,7 +269,7 @@ public class RAFIndexStore implements IndexStore {
*/
@Override
public long size() throws IOException {
return this.length() / this.indexLength;
return this.length / this.indexLength;
}
@Override
......@@ -268,7 +279,7 @@ public class RAFIndexStore implements IndexStore {
@Override
public long length() throws IOException {
return this.readRAF.length();
return this.length;
}
@Override
......
package com.dianping.cat.storage.hdfs.util;
package com.dianping.cat.storage.util;
import java.util.ArrayList;
import java.util.List;
......
package com.dianping.cat.storage.hdfs.util;
package com.dianping.cat.storage.util;
import java.io.IOException;
import java.io.InputStream;
......
package com.dianping.cat.storage.hdfs.util;
package com.dianping.cat.storage.util;
public class NumberKit {
public static int bytes2Int(byte[] bytes) {
......
package com.dianping.cat.storage.hdfs.util;
package com.dianping.cat.storage.util;
import java.io.IOException;
import java.io.Writer;
......@@ -26,7 +26,11 @@ public class StringKit {
* @return
*/
public static String capitalize(String str) {
<<<<<<< HEAD:cat-job/src/main/java/com/dianping/cat/storage/hdfs/util/StringKit.java
if (str == null || str.length()==0)
=======
if (str == null || "".equals(str))
>>>>>>> refactor:cat-job/src/main/java/com/dianping/cat/storage/util/StringKit.java
return str;
char[] chars = str.toCharArray();
char c = chars[0];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册