提交 9e31e891 编写于 作者: R Rajan 提交者: GitHub

Make EntryImpl recyclable to improve gc at broker (#318)

* Recycle EntryImpl of EntryCache

* remove position import from Entry

* Use proper ref-counting semantic for Entry and removed position-impl recycling

* release data after retain into Entry + make variables final in position

* Fixed ref-counting after reading entries
上级 1e02afb2
......@@ -15,10 +15,10 @@
*/
package org.apache.bookkeeper.mledger;
import io.netty.buffer.ByteBuf;
import com.google.common.annotations.Beta;
import io.netty.buffer.ByteBuf;
/**
* An Entry represent a ledger entry data and its associated position.
*/
......@@ -48,7 +48,8 @@ public interface Entry {
Position getPosition();
/**
* Release the resources allocated for this entry
* Release the resources (data) allocated for this entry and recycle if all the resources are deallocated (ref-count
* of data reached to 0)
*/
void release();
boolean release();
}
......@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
......@@ -118,12 +119,15 @@ public class EntryCacheImpl implements EntryCache {
entryBuf.readerIndex(readerIdx);
}
if (entries.put(entry.getPosition(), new EntryImpl(entry.getPosition(), cachedData))) {
PositionImpl position = entry.getPosition();
EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
cachedData.release();
if (entries.put(position, cacheEntry)) {
manager.entryAdded(entry.getLength());
return true;
} else {
// Buffer was not inserted into cache, we need to discard it
cachedData.release();
// entry was not inserted into cache, we need to discard it
cacheEntry.release();
return false;
}
}
......@@ -140,7 +144,6 @@ public class EntryCacheImpl implements EntryCache {
lastPosition, entriesRemoved, sizeRemoved);
}
firstPosition.recycle();
manager.entriesRemoved(sizeRemoved);
}
......@@ -157,8 +160,6 @@ public class EntryCacheImpl implements EntryCache {
ml.getName(), ledgerId, entriesRemoved, sizeRemoved);
}
firstPosition.recycle();
lastPosition.recycle();
manager.entriesRemoved(sizeRemoved);
}
......@@ -170,7 +171,7 @@ public class EntryCacheImpl implements EntryCache {
}
EntryImpl entry = entries.get(position);
if (entry != null) {
EntryImpl cachedEntry = new EntryImpl(entry);
EntryImpl cachedEntry = EntryImpl.create(entry);
entry.release();
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
......@@ -183,7 +184,11 @@ public class EntryCacheImpl implements EntryCache {
}
if (sequence.hasMoreElements()) {
EntryImpl returnEntry = new EntryImpl(sequence.nextElement());
LedgerEntry ledgerEntry = sequence.nextElement();
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);
// The EntryImpl is now the owner of the buffer, so we can release the original one
ledgerEntry.getEntryBuffer().release();
manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.mbean.addReadEntriesSample(1, returnEntry.getLength());
......@@ -212,8 +217,6 @@ public class EntryCacheImpl implements EntryCache {
}
Collection<EntryImpl> cachedEntries = entries.getRange(firstPosition, lastPosition);
firstPosition.recycle();
lastPosition.recycle();
if (cachedEntries.size() == entriesToRead) {
long totalCachedSize = 0;
......@@ -221,7 +224,7 @@ public class EntryCacheImpl implements EntryCache {
// All entries found in cache
for (EntryImpl entry : cachedEntries) {
entriesToReturn.add(new EntryImpl(entry));
entriesToReturn.add(EntryImpl.create(entry));
totalCachedSize += entry.getLength();
entry.release();
}
......@@ -261,7 +264,10 @@ public class EntryCacheImpl implements EntryCache {
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
while (sequence.hasMoreElements()) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = new EntryImpl(sequence.nextElement());
LedgerEntry ledgerEntry = sequence.nextElement();
EntryImpl entry = EntryImpl.create(ledgerEntry);
ledgerEntry.getEntryBuffer().release();
entriesToReturn.add(entry);
totalSize += entry.getLength();
......
......@@ -200,7 +200,10 @@ public class EntryCacheManager {
long totalSize = 0;
while (seq.hasMoreElements()) {
// Insert the entries at the end of the list (they will be unsorted for now)
EntryImpl entry = new EntryImpl(seq.nextElement());
LedgerEntry ledgerEntry = seq.nextElement();
EntryImpl entry = EntryImpl.create(ledgerEntry);
ledgerEntry.getEntryBuffer().release();
entries.add(entry);
totalSize += entry.getLength();
}
......
......@@ -17,41 +17,83 @@ package org.apache.bookkeeper.mledger.impl;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.ReferenceCounted;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.RecyclableDuplicateByteBuf;
import io.netty.buffer.Unpooled;
final class EntryImpl implements Entry, Comparable<EntryImpl>, ReferenceCounted {
private final PositionImpl position;
private final ByteBuf data;
EntryImpl(LedgerEntry ledgerEntry) {
this.position = new PositionImpl(ledgerEntry.getLedgerId(), ledgerEntry.getEntryId());
this.data = ledgerEntry.getEntryBuffer();
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;
final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable<EntryImpl>, ReferenceCounted {
private static final Recycler<EntryImpl> RECYCLER = new Recycler<EntryImpl>() {
@Override
protected EntryImpl newObject(Handle handle) {
return new EntryImpl(handle);
}
};
private final Handle recyclerHandle;
private long ledgerId;
private long entryId;
ByteBuf data;
public static EntryImpl create(LedgerEntry ledgerEntry) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = ledgerEntry.getLedgerId();
entry.entryId = ledgerEntry.getEntryId();
entry.data = ledgerEntry.getEntryBuffer();
entry.data.retain();
entry.setRefCnt(1);
return entry;
}
// Used just for tests
EntryImpl(long ledgerId, long entryId, byte[] data) {
this.position = new PositionImpl(ledgerId, entryId);
this.data = Unpooled.wrappedBuffer(data);
public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = Unpooled.wrappedBuffer(data);
entry.setRefCnt(1);
return entry;
}
EntryImpl(long ledgerId, long entryId, ByteBuf data) {
this.position = new PositionImpl(ledgerId, entryId);
this.data = data;
public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.data = data;
entry.data.retain();
entry.setRefCnt(1);
return entry;
}
EntryImpl(PositionImpl position, ByteBuf data) {
this.position = position;
this.data = data;
public static EntryImpl create(PositionImpl position, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = position.getLedgerId();
entry.entryId = position.getEntryId();
entry.data = data;
entry.data.retain();
entry.setRefCnt(1);
return entry;
}
EntryImpl(EntryImpl other) {
this.position = new PositionImpl(other.position);
this.data = RecyclableDuplicateByteBuf.create(other.data);
public static EntryImpl create(EntryImpl other) {
EntryImpl entry = RECYCLER.get();
entry.ledgerId = other.ledgerId;
entry.entryId = other.entryId;
entry.data = RecyclableDuplicateByteBuf.create(other.data);
entry.setRefCnt(1);
entry.data.retain();
return entry;
}
private EntryImpl(Recycler.Handle recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
@Override
......@@ -66,6 +108,7 @@ final class EntryImpl implements Entry, Comparable<EntryImpl>, ReferenceCounted
return array;
}
// Only for test
@Override
public byte[] getDataAndRelease() {
byte[] array = getData();
......@@ -80,24 +123,22 @@ final class EntryImpl implements Entry, Comparable<EntryImpl>, ReferenceCounted
@Override
public PositionImpl getPosition() {
return position;
return new PositionImpl(ledgerId, entryId);
}
@Override
public int compareTo(EntryImpl other) {
return position.compareTo(other.getPosition());
}
public void retain() {
data.retain();
return ComparisonChain.start().compare(ledgerId, other.ledgerId).compare(entryId, other.entryId).result();
}
@Override
public void release() {
protected void deallocate() {
// This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it
data.release();
data = null;
ledgerId = -1;
entryId = -1;
RECYCLER.recycle(this, recyclerHandle);
}
public int refCnt() {
return data.refCnt();
}
}
......@@ -1179,8 +1179,6 @@ public class ManagedCursorImpl implements ManagedCursor {
if (log.isDebugEnabled()) {
log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
}
oldReadPosition.recycle();
}
PositionImpl oldMarkDeletePosition = markDeletePosition;
......@@ -1211,7 +1209,6 @@ public class ManagedCursorImpl implements ManagedCursor {
// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
oldMarkDeletePosition.recycle();
return newMarkDeletePosition;
}
......@@ -1599,7 +1596,6 @@ public class ManagedCursorImpl implements ManagedCursor {
}
readPosition = newReadPosition;
oldReadPosition.recycle();
} finally {
lock.writeLock().unlock();
}
......@@ -1619,7 +1615,6 @@ public class ManagedCursorImpl implements ManagedCursor {
PositionImpl oldReadPosition = readPosition;
readPosition = newReadPosition;
oldReadPosition.recycle();
} finally {
lock.writeLock().unlock();
}
......
......@@ -1260,7 +1260,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (isCursorActive(cursor)) {
final PositionImpl lastReadPosition = PositionImpl.get(ledger.getId(), lastEntry);
discardEntriesFromCache(cursor, lastReadPosition);
lastReadPosition.recycle();
}
}
}
......
......@@ -141,7 +141,11 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
if (ml.hasActiveCursors()) {
// Avoid caching entries if no cursor has been created
ml.entryCache.insert(new EntryImpl(ledger.getId(), entryId, data));
EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
ml.entryCache.insert(entry);
entry.release();
}
// We are done using the byte buffer
......
......@@ -59,13 +59,14 @@ public class OpFindNewest implements ReadEntryCallback {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
final Position position = entry.getPosition();
switch (state) {
case checkFirst:
if (!condition.apply(entry)) {
callback.findEntryComplete(null, OpFindNewest.this.ctx);
return;
} else {
lastMatchedPosition = entry.getPosition();
lastMatchedPosition = position;
// check last entry
state = State.checkLast;
......@@ -75,7 +76,7 @@ public class OpFindNewest implements ReadEntryCallback {
break;
case checkLast:
if (condition.apply(entry)) {
callback.findEntryComplete(entry.getPosition(), OpFindNewest.this.ctx);
callback.findEntryComplete(position, OpFindNewest.this.ctx);
return;
} else {
// start binary search
......@@ -87,7 +88,7 @@ public class OpFindNewest implements ReadEntryCallback {
case searching:
if (condition.apply(entry)) {
// mid - last
lastMatchedPosition = entry.getPosition();
lastMatchedPosition = position;
min = mid();
} else {
// start - mid
......
......@@ -60,6 +60,7 @@ public class OpReadEntry implements ReadEntriesCallback {
@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
// Filter the returned entries for individual deleted messages
final Position nexReadPosition = returnedEntries.get(returnedEntries.size() - 1).getPosition().getNext();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Read entries succeeded batch_size={} cumulative_size={} requested_count={}",
cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count);
......@@ -67,7 +68,7 @@ public class OpReadEntry implements ReadEntriesCallback {
List<Entry> filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);
updateReadPosition(returnedEntries.get(returnedEntries.size() - 1).getPosition().getNext());
updateReadPosition(nexReadPosition);
checkReadCompletion();
}
......
......@@ -31,52 +31,34 @@ import io.netty.util.Recycler.Handle;
public class PositionImpl implements Position, Comparable<PositionImpl> {
private long ledgerId;
private long entryId;
private final Handle recyclerHandle;
private final long ledgerId;
private final long entryId;
public PositionImpl(PositionInfo pi) {
this.ledgerId = pi.getLedgerId();
this.entryId = pi.getEntryId();
this.recyclerHandle = null;
}
public PositionImpl(NestedPositionInfo npi) {
this.ledgerId = npi.getLedgerId();
this.entryId = npi.getEntryId();
this.recyclerHandle = null;
}
public PositionImpl(long ledgerId, long entryId) {
this.ledgerId = ledgerId;
this.entryId = entryId;
this.recyclerHandle = null;
}
public PositionImpl(PositionImpl other) {
this.ledgerId = other.ledgerId;
this.entryId = other.entryId;
this.recyclerHandle = null;
}
private PositionImpl(Handle recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
public static PositionImpl get(long ledgerId, long entryId) {
// PositionImpl position = RECYCLER.get();
// position.ledgerId = ledgerId;
// position.entryId = entryId;
// return position;
return new PositionImpl(ledgerId, entryId);
}
public static PositionImpl get(PositionImpl other) {
// PositionImpl position = RECYCLER.get();
// position.ledgerId = other.ledgerId;
// position.entryId = other.entryId;
// return position;
return new PositionImpl(other);
}
......@@ -127,16 +109,4 @@ public class PositionImpl implements Position, Comparable<PositionImpl> {
public PositionInfo getPositionInfo() {
return PositionInfo.newBuilder().setLedgerId(ledgerId).setEntryId(entryId).build();
}
private static final Recycler<PositionImpl> RECYCLER = new Recycler<PositionImpl>() {
protected PositionImpl newObject(Recycler.Handle handle) {
return new PositionImpl(handle);
}
};
public void recycle() {
if (recyclerHandle != null) {
RECYCLER.recycle(this, recyclerHandle);
}
}
}
......@@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Lists;
import io.netty.util.ReferenceCounted;
/**
* Special type of cache where get() and delete() operations can be done over a range of keys.
*
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.mledger.util;
public interface ReferenceCounted {
void retain();
void release();
}
......@@ -63,8 +63,8 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
EntryCache cache1 = cacheManager.getEntryCache(ml1);
EntryCache cache2 = cacheManager.getEntryCache(ml2);
cache1.insert(new EntryImpl(1, 1, new byte[4]));
cache1.insert(new EntryImpl(1, 0, new byte[3]));
cache1.insert(EntryImpl.create(1, 1, new byte[4]));
cache1.insert(EntryImpl.create(1, 0, new byte[3]));
assertEquals(cache1.getSize(), 7);
assertEquals(cacheManager.getSize(), 7);
......@@ -77,16 +77,16 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
cache2.insert(new EntryImpl(2, 0, new byte[1]));
cache2.insert(new EntryImpl(2, 1, new byte[1]));
cache2.insert(new EntryImpl(2, 2, new byte[1]));
cache2.insert(EntryImpl.create(2, 0, new byte[1]));
cache2.insert(EntryImpl.create(2, 1, new byte[1]));
cache2.insert(EntryImpl.create(2, 2, new byte[1]));
assertEquals(cache2.getSize(), 3);
assertEquals(cacheManager.getSize(), 10);
// Next insert should trigger a cache eviction to force the size to 8
// The algorithm should evict entries from cache1
cache2.insert(new EntryImpl(2, 3, new byte[1]));
cache2.insert(EntryImpl.create(2, 3, new byte[1]));
// Wait for eviction to be completed in background
Thread.sleep(100);
......@@ -124,13 +124,13 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
EntryCacheManager cacheManager = factory.getEntryCacheManager();
EntryCache cache1 = cacheManager.getEntryCache(ml1);
assertEquals(cache1.insert(new EntryImpl(1, 1, new byte[4])), true);
assertEquals(cache1.insert(new EntryImpl(1, 0, new byte[3])), true);
assertEquals(cache1.insert(EntryImpl.create(1, 1, new byte[4])), true);
assertEquals(cache1.insert(EntryImpl.create(1, 0, new byte[3])), true);
assertEquals(cache1.getSize(), 7);
assertEquals(cacheManager.getSize(), 7);
assertEquals(cache1.insert(new EntryImpl(1, 0, new byte[5])), false);
assertEquals(cache1.insert(EntryImpl.create(1, 0, new byte[5])), false);
assertEquals(cache1.getSize(), 7);
assertEquals(cacheManager.getSize(), 7);
......@@ -151,8 +151,8 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
assertTrue(cache1 instanceof EntryCacheManager.EntryCacheDisabled);
assertTrue(cache2 instanceof EntryCacheManager.EntryCacheDisabled);
cache1.insert(new EntryImpl(1, 1, new byte[4]));
cache1.insert(new EntryImpl(1, 0, new byte[3]));
cache1.insert(EntryImpl.create(1, 1, new byte[4]));
cache1.insert(EntryImpl.create(1, 0, new byte[3]));
assertEquals(cache1.getSize(), 0);
assertEquals(cacheManager.getSize(), 0);
......@@ -165,9 +165,9 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
assertEquals(cacheManager.mlFactoryMBean.getCacheHitsThroughput(), 0.0);
assertEquals(cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), 0);
cache2.insert(new EntryImpl(2, 0, new byte[1]));
cache2.insert(new EntryImpl(2, 1, new byte[1]));
cache2.insert(new EntryImpl(2, 2, new byte[1]));
cache2.insert(EntryImpl.create(2, 0, new byte[1]));
cache2.insert(EntryImpl.create(2, 1, new byte[1]));
cache2.insert(EntryImpl.create(2, 2, new byte[1]));
assertEquals(cache2.getSize(), 0);
assertEquals(cacheManager.getSize(), 0);
......@@ -248,7 +248,6 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
entries = c2.readEntries(10);
assertEquals(entries.size(), 10);
entries.forEach(e -> e.release());
cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 70);
......@@ -260,6 +259,7 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
PositionImpl pos = (PositionImpl) entries.get(entries.size() - 1).getPosition();
c2.setReadPosition(pos);
ledger.discardEntriesFromCache(c2, pos);
entries.forEach(e -> e.release());
cacheManager.mlFactoryMBean.refreshStats(1, TimeUnit.SECONDS);
assertEquals(cacheManager.mlFactoryMBean.getCacheUsedSize(), 0);
......
......@@ -71,7 +71,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
byte[] data = new byte[10];
for (int i = 0; i < 10; i++) {
entryCache.insert(new EntryImpl(0, i, data));
entryCache.insert(EntryImpl.create(0, i, data));
}
final CountDownLatch counter = new CountDownLatch(1);
......@@ -103,7 +103,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
byte[] data = new byte[10];
for (int i = 3; i < 10; i++) {
entryCache.insert(new EntryImpl(0, i, data));
entryCache.insert(EntryImpl.create(0, i, data));
}
final CountDownLatch counter = new CountDownLatch(1);
......@@ -131,7 +131,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
byte[] data = new byte[10];
for (int i = 0; i < 8; i++) {
entryCache.insert(new EntryImpl(0, i, data));
entryCache.insert(EntryImpl.create(0, i, data));
}
final CountDownLatch counter = new CountDownLatch(1);
......@@ -158,10 +158,10 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
entryCache.insert(new EntryImpl(0, 0, data));
entryCache.insert(new EntryImpl(0, 1, data));
entryCache.insert(new EntryImpl(0, 8, data));
entryCache.insert(new EntryImpl(0, 9, data));
entryCache.insert(EntryImpl.create(0, 0, data));
entryCache.insert(EntryImpl.create(0, 1, data));
entryCache.insert(EntryImpl.create(0, 8, data));
entryCache.insert(EntryImpl.create(0, 9, data));
final CountDownLatch counter = new CountDownLatch(1);
......@@ -187,10 +187,10 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
entryCache.insert(new EntryImpl(0, 0, data));
entryCache.insert(new EntryImpl(0, 2, data));
entryCache.insert(new EntryImpl(0, 5, data));
entryCache.insert(new EntryImpl(0, 8, data));
entryCache.insert(EntryImpl.create(0, 0, data));
entryCache.insert(EntryImpl.create(0, 2, data));
entryCache.insert(EntryImpl.create(0, 5, data));
entryCache.insert(EntryImpl.create(0, 8, data));
final CountDownLatch counter = new CountDownLatch(1);
......@@ -226,7 +226,7 @@ public class EntryCacheTest extends MockedBookKeeperTestCase {
EntryCache entryCache = cacheManager.getEntryCache(ml);
byte[] data = new byte[10];
entryCache.insert(new EntryImpl(0, 2, data));
entryCache.insert(EntryImpl.create(0, 2, data));
final CountDownLatch counter = new CountDownLatch(1);
......
......@@ -112,15 +112,15 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
List<Entry> entries = cursor.readEntries(20);
log.debug("Read {} entries", entries.size());
// Acknowledge only on last entry
Entry lastEntry = entries.get(entries.size() - 1);
cursor.markDelete(lastEntry.getPosition());
for (Entry entry : entries) {
log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
entry.release();
}
// Acknowledge only on last entry
Entry lastEntry = entries.get(entries.size() - 1);
cursor.markDelete(lastEntry.getPosition());
log.info("-----------------------");
}
......@@ -212,7 +212,6 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
List<Entry> entries = cursor.readEntries(2);
assertEquals(entries.size(), 2);
entries.forEach(e -> e.release());
assertEquals(cursor.getNumberOfEntries(), 0);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 2);
......@@ -221,6 +220,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
assertEquals(ledger.getNumberOfEntries(), 2);
assertEquals(ledger.getNumberOfActiveEntries(), 2);
cursor.markDelete(entries.get(0).getPosition());
entries.forEach(e -> e.release());
assertEquals(cursor.getNumberOfEntries(), 0);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
......@@ -278,10 +278,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
assertEquals(entries.size(), 1);
Entry entry = entries.get(0);
final Position position = entry.getPosition();
assertEquals(new String(entry.getDataAndRelease(), Encoding), "test");
log.debug("Mark-Deleting to position {}", entry.getPosition());
cursor.asyncMarkDelete(entry.getPosition(), new MarkDeleteCallback() {
log.debug("Mark-Deleting to position {}", position);
cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.debug("Mark delete complete");
......@@ -1887,23 +1888,23 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
// read 20 entries
final int readEntries = 20;
List<Entry> entries1 = cursor1.readEntries(readEntries);
// Acknowledge only on last entry
cursor1.markDelete(entries1.get(entries1.size() - 1).getPosition());
for (Entry entry : entries1) {
log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
entry.release();
}
// Acknowledge only on last entry
cursor1.markDelete(entries1.get(entries1.size() - 1).getPosition());
// read after a second: as RateLimiter limits triggering of removing cache
Thread.sleep(1000);
List<Entry> entries2 = cursor2.readEntries(readEntries);
// Acknowledge only on last entry
cursor2.markDelete((entries2.get(entries2.size() - 1)).getPosition());
for (Entry entry : entries2) {
log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
entry.release();
}
// Acknowledge only on last entry
cursor2.markDelete((entries2.get(entries2.size() - 1)).getPosition());
// (3) Validate: cache should remove all entries read by both active cursors
log.info("expected, found : {}, {}", (5 * (totalInsertedEntries - readEntries)), entryCache.getSize());
......@@ -1911,12 +1912,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
final int remainingEntries = totalInsertedEntries - readEntries;
entries1 = cursor1.readEntries(remainingEntries);
// Acknowledge only on last entry
cursor1.markDelete(entries1.get(entries1.size() - 1).getPosition());
for (Entry entry : entries1) {
log.info("Read entry. Position={} Content='{}'", entry.getPosition(), new String(entry.getData()));
entry.release();
}
// Acknowledge only on last entry
cursor1.markDelete(entries1.get(entries1.size() - 1).getPosition());
// (4) Validate: cursor2 is active cursor and has not read these entries yet: so, cache should not remove these
// entries
......
......@@ -22,30 +22,24 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
@Test
public class RangeCacheTest {
class RefString implements ReferenceCounted {
class RefString extends AbstractReferenceCounted implements ReferenceCounted {
final String s;
int count;
RefString(String s) {
super();
this.s = s;
this.count = 1;
}
@Override
public void retain() {
++count;
setRefCnt(1);
}
@Override
public void release() {
--count;
}
public int useCount() {
return count;
protected void deallocate() {
// no-op
}
@Override
......@@ -72,13 +66,13 @@ public class RangeCacheTest {
RefString s = cache.get(0);
assertEquals(s.s, "0");
assertEquals(s.count, 2);
assertEquals(s.refCnt(), 2);
s.release();
RefString s1 = cache.get(0);
RefString s2 = cache.get(0);
assertEquals(s1, s2);
assertEquals(s1.count, 3);
assertEquals(s1.refCnt(), 3);
s1.release();
s2.release();
......@@ -124,9 +118,9 @@ public class RangeCacheTest {
RangeCache<Integer, RefString> cache = new RangeCache<>();
RefString s0 = new RefString("zero");
assertEquals(s0.count, 1);
assertEquals(s0.refCnt(), 1);
assertEquals(cache.put(0, s0), true);
assertEquals(s0.count, 1);
assertEquals(s0.refCnt(), 1);
cache.put(1, new RefString("one"));
......@@ -134,12 +128,12 @@ public class RangeCacheTest {
assertEquals(cache.getNumberOfEntries(), 2);
RefString s = cache.get(1);
assertEquals(s.s, "one");
assertEquals(s.count, 2);
assertEquals(s.refCnt(), 2);
RefString s1 = new RefString("uno");
assertEquals(s1.count, 1);
assertEquals(s1.refCnt(), 1);
assertEquals(cache.put(1, s1), false);
assertEquals(s1.count, 1);
assertEquals(s1.refCnt(), 1);
s1.release();
// Should not have been overridden in cache
......
......@@ -132,7 +132,8 @@ public class Consumer {
}
/**
* Dispatch a list of entries to the consumer.
* Dispatch a list of entries to the consumer. <br/>
* <b>It is also responsible to release entries data and recycle entries object.</b>
*
* @return a promise that can be use to track when all the data has been written into the socket
*/
......@@ -174,7 +175,8 @@ public class Consumer {
.build();
ByteBuf metadataAndPayload = entry.getDataBuffer();
// increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
metadataAndPayload.retain();
// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) {
readChecksum(metadataAndPayload);
......@@ -196,6 +198,7 @@ public class Consumer {
ctx.write(Commands.newMessage(consumerId, messageId, metadataAndPayload), promise);
messageId.recycle();
messageIdBuilder.recycle();
entry.release();
}
ctx.flush();
......@@ -240,13 +243,13 @@ public class Consumer {
if (batchSize == -1) {
// this would suggest that the message might have been corrupted
iter.remove();
PositionImpl pos = (PositionImpl) entry.getPosition();
entry.release();
PositionImpl pos = PositionImpl.get((PositionImpl) entry.getPosition());
subscription.acknowledgeMessage(pos, AckType.Individual);
continue;
}
if (pendingAcks != null) {
PositionImpl pos = PositionImpl.get((PositionImpl) entry.getPosition());
PositionImpl pos = (PositionImpl) entry.getPosition();
pendingAcks.put(pos, batchSize);
}
// check if consumer supports batch message
......
......@@ -21,6 +21,7 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
......@@ -36,6 +37,7 @@ import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.Consumer;
......@@ -298,13 +300,16 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
int messagesForC = Math.min(Math.min(entriesToDispatch, c.getAvailablePermits()), MaxRoundRobinBatchSize);
if (messagesForC > 0) {
int msgSent = c.sendMessages(entries.subList(start, start + messagesForC)).getRight();
// remove positions first from replay list first : sendMessages recycles entries
if (readType == ReadType.Replay) {
entries.subList(start, start + messagesForC).forEach(entry -> {
messagesToReplay.remove((PositionImpl) entry.getPosition());
});
}
int msgSent = c.sendMessages(entries.subList(start, start + messagesForC)).getRight();
start += messagesForC;
entriesToDispatch -= messagesForC;
totalAvailablePermits -= msgSent;
......
......@@ -444,7 +444,7 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback
private void recycle() {
replicator = null;
entry = null;
entry = null; //already released and recycled on sendComplete
if (msg != null) {
msg.recycle();
msg = null;
......
......@@ -177,7 +177,6 @@ public class PersistentSubscription implements Subscription {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Mark deleted messages until position {}", topicName, subName, pos);
}
pos.recycle();
}
@Override
......@@ -196,8 +195,6 @@ public class PersistentSubscription implements Subscription {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Deleted message at {}", topicName, subName, pos);
}
pos.recycle();
}
@Override
......
......@@ -203,7 +203,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
PositionImpl position = (PositionImpl) pos;
// Message has been successfully persisted
callback.completed(null, position.getLedgerId(), position.getEntryId());
position.recycle();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册