提交 5acbeb4f 编写于 作者: Y You Yong

Merge branch 'master' of ssh://192.168.8.22:58422/cat

...@@ -179,7 +179,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo ...@@ -179,7 +179,11 @@ public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionRepo
String sessionTag = "s:" + tree.getSessionToken(); String sessionTag = "s:" + tree.getSessionToken();
String requestTag = "r:" + messageId; String requestTag = "r:" + messageId;
try {
m_messageBucket.storeById(messageId, tree, threadTag, sessionTag, requestTag); m_messageBucket.storeById(messageId, tree, threadTag, sessionTag, requestTag);
} catch (IOException e) {
m_logger.error("", e);
}
} }
} }
} }
......
...@@ -13,7 +13,7 @@ public interface MessageStorage { ...@@ -13,7 +13,7 @@ public interface MessageStorage {
* message tree to store * message tree to store
* @return relative path to base directory or base URL * @return relative path to base directory or base URL
*/ */
public String store(MessageTree tree); public String store(MessageTree tree, String... tag);
/** /**
* Fetch a message tree from the store. * Fetch a message tree from the store.
...@@ -23,6 +23,18 @@ public interface MessageStorage { ...@@ -23,6 +23,18 @@ public interface MessageStorage {
*/ */
public MessageTree get(String messageId); public MessageTree get(String messageId);
/**
* @param messageId
* @return
*/
public MessageTree next(String messageId, String tag);
/**
* @param messageId
* @return
*/
public MessageTree previous(String messageId, String tag);
/** /**
* *
* Get relative path to base directory or base URL. * Get relative path to base directory or base URL.
......
...@@ -58,7 +58,7 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis ...@@ -58,7 +58,7 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis
} }
@Override @Override
public String store(MessageTree tree) { public String store(MessageTree tree, String... tags) {
String path = m_builder.getLogViewPath(tree.getMessageId()); String path = m_builder.getLogViewPath(tree.getMessageId());
m_job.append(tree); m_job.append(tree);
...@@ -145,4 +145,16 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis ...@@ -145,4 +145,16 @@ public class DefaultMessageStorage implements MessageStorage, Initializable, Dis
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override
public MessageTree next(String messageId, String tag) {
// TODO Auto-generated method stub
return null;
}
@Override
public MessageTree previous(String messageId, String tag) {
// TODO Auto-generated method stub
return null;
}
} }
...@@ -5,16 +5,16 @@ import java.io.IOException; ...@@ -5,16 +5,16 @@ import java.io.IOException;
import java.util.List; import java.util.List;
public interface Bucket<T> extends TagThreadSupport<T> { public interface Bucket<T> extends TagThreadSupport<T> {
public void close(); public void close() throws IOException;
public void deleteAndCreate(); public void deleteAndCreate() throws IOException;
public List<T> findAllByIds(List<String> ids); public List<T> findAllByIds(List<String> ids) throws IOException;;
public T findById(String id); public T findById(String id) throws IOException;;
public void initialize(Class<?> type, File path) throws IOException; public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException;
public boolean storeById(String id, T data); public boolean storeById(String id, T data) throws IOException;;
} }
package com.dianping.cat.storage; package com.dianping.cat.storage;
import java.io.IOException;
import java.util.List; import java.util.List;
/** /**
...@@ -9,11 +10,11 @@ import java.util.List; ...@@ -9,11 +10,11 @@ import java.util.List;
* Sample tags: "thread:101", "session:abc", "request:xyz" * Sample tags: "thread:101", "session:abc", "request:xyz"
*/ */
public interface TagThreadSupport<T> { public interface TagThreadSupport<T> {
public boolean storeById(String id, T data, String... tags); public boolean storeById(String id, T data, String... tags) throws IOException;;
public List<String> findAllIdsByTag(String tag); public List<String> findAllIdsByTag(String tag) throws IOException;;
public T findNextById(String id, Direction direction, String tag); public T findNextById(String id, Direction direction, String tag) throws IOException;;
public static enum Direction { public static enum Direction {
FORWARD, FORWARD,
......
...@@ -174,7 +174,8 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo ...@@ -174,7 +174,8 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
} }
@Override @Override
public void initialize(Class<?> type, File path) throws IOException { public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException {
File path = new File(baseDir, logicalPath);
m_writeLock = new ReentrantLock(); m_writeLock = new ReentrantLock();
m_readLock = new ReentrantLock(); m_readLock = new ReentrantLock();
m_file = path; m_file = path;
......
...@@ -21,8 +21,7 @@ public class DefaultBucket<T> extends AbstractFileBucket<T> { ...@@ -21,8 +21,7 @@ public class DefaultBucket<T> extends AbstractFileBucket<T> {
return (T) bytes; return (T) bytes;
} else { } else {
throw new UnsupportedOperationException(String.format( throw new UnsupportedOperationException(String.format("Only String or byte[] are supported so far, but was %s.", m_type));
"Only String or byte[] are supported so far, but was %s.", m_type));
} }
} }
...@@ -40,20 +39,18 @@ public class DefaultBucket<T> extends AbstractFileBucket<T> { ...@@ -40,20 +39,18 @@ public class DefaultBucket<T> extends AbstractFileBucket<T> {
buf.writeInt(bytes.length); buf.writeInt(bytes.length);
buf.writeBytes(bytes); buf.writeBytes(bytes);
} else { } else {
throw new UnsupportedOperationException(String.format( throw new UnsupportedOperationException(String.format("Only String or byte[] are supported so far, but was %s.", m_type));
"Only String or byte[] are supported so far, but was %s.", m_type));
} }
} }
@Override @Override
public void initialize(Class<?> type, File path) throws IOException { public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException {
super.initialize(type, path); super.initialize(type, baseDir, logicalPath);
m_type = type; m_type = type;
if (m_type != String.class && m_type != byte[].class) { if (m_type != String.class && m_type != byte[].class) {
throw new UnsupportedOperationException(String.format( throw new UnsupportedOperationException(String.format("Only String or byte[] are supported so far, but was %s.", m_type));
"Only String or byte[] are supported so far, but was %s.", m_type));
} }
} }
......
...@@ -22,7 +22,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag ...@@ -22,7 +22,7 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
protected Bucket<?> createBucket(String path, Class<?> type) throws IOException { protected Bucket<?> createBucket(String path, Class<?> type) throws IOException {
Bucket<?> bucket = lookup(Bucket.class, type.getName()); Bucket<?> bucket = lookup(Bucket.class, type.getName());
bucket.initialize(type, new File(m_baseDir, path)); bucket.initialize(type, new File(m_baseDir), path);
return bucket; return bucket;
} }
...@@ -115,7 +115,11 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag ...@@ -115,7 +115,11 @@ public class DefaultBucketManager extends ContainerHolder implements BucketManag
@Override @Override
public void closeBucket(Bucket<?> bucket) { public void closeBucket(Bucket<?> bucket) {
try {
bucket.close(); bucket.close();
} catch (IOException e) {
e.printStackTrace();
}
release(bucket); release(bucket);
} }
} }
...@@ -53,7 +53,7 @@ public class BucketTest extends ComponentTestCase { ...@@ -53,7 +53,7 @@ public class BucketTest extends ComponentTestCase {
// close and reload it, check if everything is okay // close and reload it, check if everything is okay
bucket.close(); bucket.close();
bucket.initialize(byte[].class, new File("target/bucket/bytes")); bucket.initialize(byte[].class, new File("target/bucket/"), "bytes");
// store it and load it // store it and load it
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
...@@ -99,7 +99,7 @@ public class BucketTest extends ComponentTestCase { ...@@ -99,7 +99,7 @@ public class BucketTest extends ComponentTestCase {
// close and reload it, check if everything is okay // close and reload it, check if everything is okay
bucket.close(); bucket.close();
bucket.initialize(MessageTree.class, new File("target/bucket/message")); bucket.initialize(MessageTree.class, new File("target/bucket/"), "message");
// check next message in the same thread // check next message in the same thread
for (int i = 0; i < groups - 1; i++) { for (int i = 0; i < groups - 1; i++) {
...@@ -135,7 +135,7 @@ public class BucketTest extends ComponentTestCase { ...@@ -135,7 +135,7 @@ public class BucketTest extends ComponentTestCase {
// close and reload it, check if everything is okay // close and reload it, check if everything is okay
bucket.close(); bucket.close();
bucket.initialize(String.class, new File("target/bucket/data")); bucket.initialize(String.class, new File("target/bucket/"), "data");
// store it and load it // store it and load it
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
......
...@@ -42,6 +42,11 @@ ...@@ -42,6 +42,11 @@
<artifactId>jackson-mapper-asl</artifactId> <artifactId>jackson-mapper-asl</artifactId>
<version>1.9.4</version> <version>1.9.4</version>
</dependency> </dependency>
<dependency>
<groupId>com.dianping.tkv</groupId>
<artifactId>fkv</artifactId>
<version>0.1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
......
/**
*
*/
package com.dianping.cat.job.hdfs;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.dianping.cat.storage.Bucket;
import com.dianping.tkv.Meta;
import com.dianping.tkv.hdfs.HdfsImpl;
/**
* @author sean.wang
* @since Mar 9, 2012
*/
public class HdfsBucket implements Bucket<byte[]> {
private HdfsImpl hdfs;
private int keyLength = 32;
private int tagLength = 125;
public void setHdfs(HdfsImpl hdfs) {
this.hdfs = hdfs;
}
public void setKeyLength(int keyLength) {
this.keyLength = keyLength;
}
public void setTagLength(int tagLength) {
this.tagLength = tagLength;
}
@Override
public boolean storeById(String id, byte[] data, String... tags) {
throw new UnsupportedOperationException();
}
@Override
public List<String> findAllIdsByTag(String tag) {
throw new UnsupportedOperationException();
}
@Override
public byte[] findNextById(String id, com.dianping.cat.storage.TagThreadSupport.Direction direction, String tag) throws IOException {
Meta meta = hdfs.getIndex(id, tag);
if (meta == null) {
return null;
}
int nextPos = meta.getTags().get(tag).getNext();
return hdfs.get(nextPos);
}
@Override
public void close() throws IOException {
this.hdfs.close();
}
@Override
public void deleteAndCreate() {
throw new UnsupportedOperationException();
}
@Override
public List<byte[]> findAllByIds(List<String> ids) throws IOException {
List<byte[]> values = new ArrayList<byte[]>(ids.size());
for (String id : ids) {
byte[] value = this.findById(id);
values.add(value);
}
return values;
}
@Override
public byte[] findById(String id) throws IOException {
return hdfs.get(id);
}
/**
* @param baseDir
* e.g /data/appdata/cat/
* @param logicalPath
* e.g /a/b/c
*/
@Override
public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException {
String name = new File(logicalPath).getName();
String indexFilename = null;
String dataFilename = null;
String hdfsDir = null;
hdfs = new HdfsImpl(hdfsDir, baseDir, indexFilename, dataFilename, keyLength, tagLength);
}
@Override
public boolean storeById(String id, byte[] data) {
throw new UnsupportedOperationException();
}
}
/**
*
*/
package com.dianping.cat.job.hdfs;
import com.dianping.cat.storage.internal.DefaultBucketManager;
/**
* @author sean.wang
* @since Mar 9, 2012
*/
public class HdfsBucketManager extends DefaultBucketManager {
}
...@@ -11,11 +11,15 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable; ...@@ -11,11 +11,15 @@ import org.codehaus.plexus.personality.plexus.lifecycle.phase.Disposable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable; import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException; import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import com.dianping.cat.message.spi.MessagePathBuilder;
import com.dianping.cat.message.spi.MessageStorage; import com.dianping.cat.message.spi.MessageStorage;
import com.dianping.cat.message.spi.MessageTree; import com.dianping.cat.message.spi.MessageTree;
import com.site.lookup.annotation.Inject; import com.site.lookup.annotation.Inject;
public class HdfsMessageStorage implements MessageStorage, Initializable, Disposable, LogEnabled { public class HdfsMessageStorage implements MessageStorage, Initializable, Disposable, LogEnabled {
@Inject
private MessagePathBuilder m_builder;
@Inject @Inject
private OutputChannelManager m_manager; private OutputChannelManager m_manager;
...@@ -55,12 +59,13 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos ...@@ -55,12 +59,13 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos
@Override @Override
public String getPath(MessageTree tree) { public String getPath(MessageTree tree) {
// TODO fix it String path = m_builder.getLogViewPath(tree.getMessageId());
return null;
return path;
} }
@Override @Override
public String store(MessageTree tree) { public String store(MessageTree tree, String... tags) {
m_job.append(tree); m_job.append(tree);
// Not available // Not available
...@@ -149,4 +154,16 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos ...@@ -149,4 +154,16 @@ public class HdfsMessageStorage implements MessageStorage, Initializable, Dispos
//TODO //TODO
return null; return null;
} }
@Override
public MessageTree next(String messageId, String tag) {
// TODO Auto-generated method stub
return null;
}
@Override
public MessageTree previous(String messageId, String tag) {
// TODO Auto-generated method stub
return null;
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册