提交 4b1868d6 编写于 作者: F Frankie Wu

revise Bucket interface

上级 2f318c12
......@@ -115,7 +115,7 @@ public class TcpSocketSender implements MessageSender, LogEnabled {
}
if (m_future != null && m_future.getChannel().isOpen()) {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(20 * 1024); // 20K
ChannelBuffer buf = ChannelBuffers.dynamicBuffer(10 * 1024); // 10K
m_codec.encode(tree, buf);
m_future.getChannel().write(buf);
......
......@@ -4,17 +4,30 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
public interface Bucket<T> extends TagThreadSupport<T> {
public interface Bucket<T> {
public void close() throws IOException;
public void deleteAndCreate() throws IOException;
public List<T> findAllByIds(List<String> ids) throws IOException;;
public List<T> findAllByIds(List<String> ids) throws IOException;
public T findById(String id) throws IOException;;
public T findById(String id) throws IOException;
public void initialize(Class<?> type, File baseDir, String logicalPath) throws IOException;
public boolean storeById(String id, T data) throws IOException;;
public void flush() throws IOException;
public boolean storeById(String id, T data, String... tags) throws IOException;;
public List<String> findAllIdsByTag(String tag) throws IOException;;
public T findNextById(String id, String tag) throws IOException;;
public T findPreviousById(String id, String tag) throws IOException;;
public static enum Direction {
FORWARD,
BACKWARD;
}
}
package com.dianping.cat.storage;
import java.io.IOException;
import java.util.List;
/**
* Map to one HDFS directory for one report.
* <p>
*
* Sample tags: "thread:101", "session:abc", "request:xyz"
*/
public interface TagThreadSupport<T> {
public boolean storeById(String id, T data, String... tags) throws IOException;;
public List<String> findAllIdsByTag(String tag) throws IOException;;
public T findNextById(String id, Direction direction, String tag) throws IOException;;
public static enum Direction {
FORWARD,
BACKWARD;
}
}
......@@ -18,11 +18,10 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.TagThreadSupport;
import com.site.helper.Joiners;
import com.site.helper.Splitters;
public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSupport<T>, LogEnabled {
public abstract class AbstractFileBucket<T> implements Bucket<T>, LogEnabled {
private static final String[] EMPTY = new String[0];
// key => offset of record
......@@ -143,8 +142,7 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
return null;
}
@Override
public T findNextById(String id, Direction direction, String tag) {
private T findNextById(String id, Direction direction, String tag) {
List<String> ids = m_tagToIds.get(tag);
if (ids != null) {
......@@ -169,6 +167,27 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
return null;
}
@Override
public T findNextById(String id, String tag) throws IOException {
return findNextById(id, Direction.BACKWARD, tag);
}
@Override
public T findPreviousById(String id, String tag) throws IOException {
return findNextById(id, Direction.FORWARD, tag);
}
@Override
public void flush() throws IOException {
m_writeLock.lock();
try {
m_writeFile.getChannel().force(true);
} finally {
m_writeLock.lock();
}
}
public Set<String> getIds() {
return m_idToOffsets.keySet();
}
......@@ -239,11 +258,6 @@ public abstract class AbstractFileBucket<T> implements Bucket<T>, TagThreadSuppo
}
}
@Override
public boolean storeById(String id, T data) {
return storeById(id, data, EMPTY);
}
/**
* Store the message in the format of:<br>
*
......
......@@ -6,7 +6,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
......@@ -62,7 +61,7 @@ public class BucketPerfTest extends ComponentTestCase {
long start = System.currentTimeMillis();
for (int i = 0; i < perfTimes; i++) {
bucket.findNextById(String.valueOf(10000000 + i), Direction.FORWARD, "pet" + (i % 1000));
bucket.findNextById(String.valueOf(10000000 + i), "pet" + (i % 1000));
}
System.out.println("testGetTagRecordPerf:" + (System.currentTimeMillis() - start));
......
......@@ -11,7 +11,6 @@ import org.junit.runners.JUnit4;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
......@@ -99,7 +98,7 @@ public class BucketTest extends ComponentTestCase {
String id = "id" + (i * groups + i);
String nextId = "id" + ((i + 1) * groups + i);
String tag = "r:" + i;
MessageTree t1 = bucket.findNextById(id, Direction.FORWARD, tag);
MessageTree t1 = bucket.findNextById(id, tag);
MessageTree t2 = bucket.findById(nextId);
Assert.assertEquals("Unable to find next message in the thread " + i + ".", t1.toString(), t2.toString());
......@@ -114,7 +113,7 @@ public class BucketTest extends ComponentTestCase {
String id = "id" + (i * groups + i);
String nextId = "id" + ((i + 1) * groups + i);
String tag = "r:" + i;
MessageTree t1 = bucket.findNextById(id, Direction.FORWARD, tag);
MessageTree t1 = bucket.findNextById(id, tag);
MessageTree t2 = bucket.findById(nextId);
Assert.assertEquals("Unable to find next message in the thread " + i + ".", t1.toString(), t2.toString());
......@@ -153,5 +152,24 @@ public class BucketTest extends ComponentTestCase {
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
for (int i = 0; i < 90; i++) {
String id = "id" + i;
String t1 = "value" + (i + 10);
String tag = "tag" + (i % 10);
String t2 = bucket.findNextById(id, tag);
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
for (int i = 10; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + (i - 10);
String tag = "tag" + (i % 10);
String t2 = bucket.findPreviousById(id, tag);
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
}
}
......@@ -10,7 +10,6 @@ import com.dianping.cat.report.page.model.spi.ModelPeriod;
import com.dianping.cat.report.page.model.spi.ModelRequest;
import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.annotation.Inject;
import com.site.web.mvc.PageHandler;
import com.site.web.mvc.annotation.InboundActionMeta;
......@@ -24,7 +23,7 @@ public class Handler implements PageHandler<Context> {
@Inject(type = ModelService.class, value = "logview")
private ModelService<String> m_service;
private String getLogView(String messageId, Direction direction, String tag) {
private String getLogView(String messageId, Boolean direction, String tag) {
if (messageId != null) {
MessageId id = MessageId.parse(messageId);
ModelPeriod period = ModelPeriod.getByTime(id.getTimestamp());
......@@ -32,7 +31,7 @@ public class Handler implements PageHandler<Context> {
.setProperty("messageId", messageId);
if (direction != null && tag != null) {
request.setProperty("direction", direction.name());
request.setProperty("direction", String.valueOf(direction));
request.setProperty("tag", tag);
}
......
package com.dianping.cat.report.page.logview;
import com.dianping.cat.report.ReportPage;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.web.mvc.ActionContext;
import com.site.web.mvc.ActionPayload;
import com.site.web.mvc.payload.annotation.FieldMeta;
......@@ -30,11 +29,11 @@ public class Payload implements ActionPayload<ReportPage, Action> {
return m_action;
}
public Direction getDirection() {
public Boolean getDirection() {
if (m_tag1 != null) {
return Direction.BACKWARD;
return false;
} else if (m_tag2 != null) {
return Direction.FORWARD;
return true;
} else {
return null;
}
......
......@@ -15,7 +15,6 @@ import com.dianping.cat.report.page.model.spi.ModelResponse;
import com.dianping.cat.report.page.model.spi.ModelService;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.dianping.cat.storage.TagThreadSupport.Direction;
import com.site.lookup.annotation.Inject;
public class LocalLogViewService implements ModelService<String> {
......@@ -42,9 +41,13 @@ public class LocalLogViewService implements ModelService<String> {
MessageTree tree = null;
if (tag != null && direction != null) {
Direction d = Direction.valueOf(direction);
Boolean d = Boolean.valueOf(direction);
tree = bucket.findNextById(messageId, d, tag);
if (d.booleanValue()) {
tree = bucket.findNextById(messageId, tag);
} else {
tree = bucket.findPreviousById(messageId, tag);
}
}
// if not found, use current instead
......
value0value1value2value3value4value5value6value7value8value9value10value11value12value13value14value15value16value17value18value19value20value21value22value23value24value25value26value27value28value29value30value31value32value33value34value35value36value37value38value39value40value41value42value43value44value45value46value47value48value49value50value51value52value53value54value55value56value57value58value59value60value61value62value63value64value65value66value67value68value69value70value71value72value73value74value75value76value77value78value79value80value81value82value83value84value85value86value87value88value89value90value91value92value93value94value95value96value97value98value99value100value101value102value103value104value105value106value107value108value109value110value111value112value113value114value115value116value117value118value119value120value121value122value123value124value125value126value127value128value129value130value131value132value133value134value135value136value137value138value139value140value141value142value143value144value145value146value147value148value149value150value151value152value153value154value155value156value157value158value159value160value161value162value163value164value165value166value167value168value169value170value171value172value173value174value175value176value177value178value179value180value181value182value183value184value185value186value187value188value189value190value191value192value193value194value195value196value197value198value199value200value201value202value203value204value205value206value207value208value209value210value211value212value213value214value215value216value217value218value219value220value221value222value223value224value225value226value227value228value229value230value231value232value233value234value235value236value237value238value239value240value241value242value243value244value245value246value247value248value249value250value251value252value253value254value255value300value301value302value303value304value305value306value307value308value309value310value311value312value313value314value315value316value317value318value319value320value321value322value323value324value325value326value327value328value329value330value331value332value333value334value335value336value337value338value339value340value341value342value343value344value345value346value347value400value500value501value502value503value504value600value700value800value900value901value902value903value904value905value906value907value908value909value910value911value912value913value914value915value916value917value918value919value920value921value922value923value924value925value926value927value928value929value930value931value932value933value934value935value936value937value938value939value940value941value942value943value944value945value946value947value948value949value950value951value952value953value954value955value956value957value958value959value960value961value962value963value964value965value966value967value968value969value970value971value972value973value974value975value976value977value978value979value980value981value982value983value984value985value986value987value988value989value990value991value992value993value994value995value996value997value998value999value256value348value349value350value351value352value353value354value401value402value403value404value405value406value407value408value409value410value411value412value413value414value415value416value417value418value419value420value421value422value423value424value425value426value427value428value429value430value431value432value433value505value506value507value508value509value510value511value512value513value514value515value516value517value518value519value520value521value522value523value524value525value526value527value528value529value530value531value532value533value534value535value536value537value538value539value540value541value542value543value544value545value546value547value548value549value550value551value552value553value554value555value556value557value558value559value560value561value562value601value602value603value604value605value606value607value608value609value610value611value612value613value614value615value616value701value702value703value704value705value706value707value708value709value710value711value712value713value714value715value716value717value718value719value720value721value722value723value724value725value726value727value728value729value730value731value732value733value734value735value736value737value738value739value740value741value742value743value744value745value746value747value748value749value750value751value752value753value754value755value801value802value803value804value805value806value807value808value809value810value811value812value813value814value815value816value817value818value819value820value821value822value823value824value825value826value827value828value829value830value831value832value833value834value835value836value837value838value839value840value841value842value843value844value845value846value847value848value849value850value851value852value853value854value855value856value857value858value859value860value861value862value863value864value865value866value867value868value869value870value871value872value873value874value875value876value877value878value879value880value881value882value883value884value885value886value887value888value889value890value891value892value893value894value895value896value897value898value899value257value258value259value260value261value262value263value264value265value266value267value268value269value270value271value272value273value274value275value276value277value278value279value280value281value282value283value284value285value286value287value288value289value290value291value292value293value294value295value296value297value298value299value355value434value435value436value437value438value439value440value441value442value443value444value445value446value447value448value449value450value451value452value453value454value455value456value457value458value459value460value461value462value463value464value465value466value467value468value469value470value471value472value473value474value475value476value477value478value479value480value481value482value483value484value485value486value487value488value489value490value491value492value493value494value495value496value497value498value499value563value617value618value619value620value621value622value623value624value625value626value627value628value629value630value631value632value633value634value635value636value637value638value639value640value756value757value758value356value641value642value564value565value566value567value568value569value570value571value572value573value574value575value576value577value578value579value580value581value582value583value584value585value586value587value588value589value590value591value592value593value594value595value596value597value598value599value759value760value761value762value763value764value765value766value767value768value769value770value771value772value773value774value775value776value777value778value779value780value781value782value783value784value785value786value787value788value789value790value791value792value793value794value795value796value797value798value799value357value643value644value645value646value647value648value649value650value651value652value653value654value655value656value657value658value659value660value661value662value663value664value665value666value667value668value669value670value671value672value673value674value675value676value677value678value679value680value681value682value683value684value685value686value687value688value689value690value691value692value693value694value695value696value697value698value699value358value359value360value361value362value363value364value365value366value367value368value369value370value371value372value373value374value375value376value377value378value379value380value381value382value383value384value385value386value387value388value389value390value391value392value393value394value395value396value397value398value399
\ No newline at end of file
package com.dianping.cat.storage.hdfs;
import java.io.File;
import junit.framework.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import com.dianping.cat.storage.Bucket;
import com.dianping.cat.storage.BucketManager;
import com.site.lookup.ComponentTestCase;
@RunWith(JUnit4.class)
public class BucketTest extends ComponentTestCase {
@Test
public void testBytesBucket() throws Exception {
BucketManager manager = lookup(BucketManager.class);
Bucket<byte[]> bucket = manager.getHdfsBucket("hdfs");
bucket.deleteAndCreate();
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
String tag = "tag" + (i % 10);
boolean success = bucket.storeById(id, t1.getBytes(), tag);
if (success) {
String t2 = new String(bucket.findById(id));
Assert.assertEquals("Unable to find data after stored it.", t1, t2);
} else {
Assert.fail("Data failed to store at i=" + i + ".");
}
}
// close and reload it, check if everything is okay
bucket.close();
bucket.initialize(byte[].class, new File("target/bucket/"), "bytes");
// store it and load it
for (int i = 0; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + i;
String t2 = new String(bucket.findById(id));
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
for (int i = 0; i < 90; i++) {
String id = "id" + i;
String t1 = "value" + (i + 10);
String tag = "tag" + (i % 10);
String t2 = new String(bucket.findNextById(id, tag));
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
for (int i = 10; i < 100; i++) {
String id = "id" + i;
String t1 = "value" + (i - 10);
String tag = "tag" + (i % 10);
String t2 = new String(bucket.findPreviousById(id, tag));
Assert.assertEquals("Unable to find data by id.", t1, t2);
}
}
}
......@@ -21,11 +21,12 @@ public class HdfsBucketTest extends ComponentTestCase {
public void testLookup() throws Exception {
BucketManager manager = lookup(BucketManager.class);
HdfsBucket bucket = (HdfsBucket) manager.getHdfsBucket("/a/b/c");
bucket.deleteAndCreate();
bucket.startWrite();
// keys must asc order for offset calculation bellow!
// keys must asc order for offset calculation bellow!
final String key1 = "12345678901234567890123456789010";
final String key2 = "12345678901234567890123456789017";
final String key3 = "12345678901234567890123456789029";
......
......@@ -33,7 +33,7 @@
<dependency>
<groupId>com.site.common</groupId>
<artifactId>lookup</artifactId>
<version>1.0.3</version>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册