提交 f0fbb4da 编写于 作者: M Matteo Merli 提交者: GitHub

Extending ZK version wrapper to include ctime and mtime (#288)

上级 d1cb591d
...@@ -18,7 +18,6 @@ package org.apache.bookkeeper.mledger.impl; ...@@ -18,7 +18,6 @@ package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.ZNodeProtobufFormat;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Collections; import java.util.Collections;
...@@ -56,7 +55,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; ...@@ -56,7 +55,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Version; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.ZNodeProtobufFormat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
...@@ -105,8 +105,8 @@ public class ManagedCursorImpl implements ManagedCursor { ...@@ -105,8 +105,8 @@ public class ManagedCursorImpl implements ManagedCursor {
// Current ledger used to append the mark-delete position // Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger; private volatile LedgerHandle cursorLedger;
// Version of the cursor z-node // Stat of the cursor z-node
private volatile Version cursorLedgerVersion; private volatile Stat cursorLedgerStat;
private final RangeSet<PositionImpl> individualDeletedMessages = TreeRangeSet.create(); private final RangeSet<PositionImpl> individualDeletedMessages = TreeRangeSet.create();
private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ReadWriteLock lock = new ReentrantReadWriteLock();
...@@ -188,9 +188,9 @@ public class ManagedCursorImpl implements ManagedCursor { ...@@ -188,9 +188,9 @@ public class ManagedCursorImpl implements ManagedCursor {
log.info("[{}] Recovering from bookkeeper ledger cursor: {}", ledger.getName(), name); log.info("[{}] Recovering from bookkeeper ledger cursor: {}", ledger.getName(), name);
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() { ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() {
@Override @Override
public void operationComplete(ManagedCursorInfo info, Version version) { public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerVersion = version; cursorLedgerStat = stat;
if (info.getCursorsLedgerId() == -1L) { if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly // There is no cursor ledger to read the last position from. It means the cursor has been properly
...@@ -765,10 +765,10 @@ public class ManagedCursorImpl implements ManagedCursor { ...@@ -765,10 +765,10 @@ public class ManagedCursorImpl implements ManagedCursor {
if (cursorLedger == null) { if (cursorLedger == null) {
persistPositionMetaStore(-1, newPosition, new MetaStoreCallback<Void>() { persistPositionMetaStore(-1, newPosition, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}", log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}",
ledger.getName(), name, -1, markDeletePosition, readPosition); ledger.getName(), name, -1, markDeletePosition, readPosition);
cursorLedgerVersion = version; cursorLedgerStat = stat;
finalCallback.operationComplete(); finalCallback.operationComplete();
} }
...@@ -1684,11 +1684,11 @@ public class ManagedCursorImpl implements ManagedCursor { ...@@ -1684,11 +1684,11 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, markDeletePosition); log.debug("[{}][{}] Closing cursor at md-position: {}", ledger.getName(), name, markDeletePosition);
} }
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info.build(), cursorLedgerVersion, ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), name, info.build(), cursorLedgerStat,
new MetaStoreCallback<Void>() { new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat stat) {
callback.operationComplete(result, version); callback.operationComplete(result, stat);
} }
@Override @Override
...@@ -1739,7 +1739,7 @@ public class ManagedCursorImpl implements ManagedCursor { ...@@ -1739,7 +1739,7 @@ public class ManagedCursorImpl implements ManagedCursor {
persistPositionMetaStore(-1, markDeletePosition, new MetaStoreCallback<Void>() { persistPositionMetaStore(-1, markDeletePosition, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat stat) {
log.info("[{}][{}] Closed cursor at md-position={}", ledger.getName(), name, log.info("[{}][{}] Closed cursor at md-position={}", ledger.getName(), name,
markDeletePosition); markDeletePosition);
...@@ -1974,12 +1974,12 @@ public class ManagedCursorImpl implements ManagedCursor { ...@@ -1974,12 +1974,12 @@ public class ManagedCursorImpl implements ManagedCursor {
} }
persistPositionMetaStore(lh.getId(), markDeletePosition, new MetaStoreCallback<Void>() { persistPositionMetaStore(lh.getId(), markDeletePosition, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}", log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}",
ledger.getName(), name, lh.getId(), markDeletePosition, readPosition); ledger.getName(), name, lh.getId(), markDeletePosition, readPosition);
final LedgerHandle oldLedger = cursorLedger; final LedgerHandle oldLedger = cursorLedger;
cursorLedger = lh; cursorLedger = lh;
cursorLedgerVersion = version; cursorLedgerStat = stat;
// At this point the position had already been safely markdeleted // At this point the position had already been safely markdeleted
callback.operationComplete(); callback.operationComplete();
......
...@@ -56,7 +56,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; ...@@ -56,7 +56,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Version; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.CallbackMutex;
...@@ -95,7 +95,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -95,7 +95,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final ConcurrentLongHashMap<CompletableFuture<LedgerHandle>> ledgerCache = new ConcurrentLongHashMap<>(); private final ConcurrentLongHashMap<CompletableFuture<LedgerHandle>> ledgerCache = new ConcurrentLongHashMap<>();
private final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>(); private final NavigableMap<Long, LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
private Version ledgersVersion; private volatile Stat ledgersStat;
private final ManagedCursorContainer cursors = new ManagedCursorContainer(); private final ManagedCursorContainer cursors = new ManagedCursorContainer();
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer(); private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
...@@ -195,7 +195,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -195,7 +195,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
NUMBER_OF_ENTRIES_UPDATER.set(this, 0); NUMBER_OF_ENTRIES_UPDATER.set(this, 0);
ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0); ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0);
STATE_UPDATER.set(this, State.None); STATE_UPDATER.set(this, State.None);
this.ledgersVersion = null; this.ledgersStat = null;
this.mbean = new ManagedLedgerMBeanImpl(this); this.mbean = new ManagedLedgerMBeanImpl(this);
this.entryCache = factory.getEntryCacheManager().getEntryCache(this); this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue(); this.waitingCursors = Queues.newConcurrentLinkedQueue();
...@@ -212,8 +212,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -212,8 +212,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Fetch the list of existing ledgers in the managed ledger // Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, new MetaStoreCallback<ManagedLedgerInfo>() { store.getManagedLedgerInfo(name, new MetaStoreCallback<ManagedLedgerInfo>() {
@Override @Override
public void operationComplete(ManagedLedgerInfo mlInfo, Version version) { public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
ledgersVersion = version; ledgersStat = stat;
for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls); ledgers.put(ls.getLedgerId(), ls);
} }
...@@ -286,8 +286,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -286,8 +286,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final MetaStoreCallback<Void> storeLedgersCb = new MetaStoreCallback<Void>() { final MetaStoreCallback<Void> storeLedgersCb = new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void v, Version version) { public void operationComplete(Void v, Stat stat) {
ledgersVersion = version; ledgersStat = stat;
initializeCursors(callback); initializeCursors(callback);
} }
...@@ -320,7 +320,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -320,7 +320,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()) ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values())
.build(); .build();
store.asyncUpdateLedgerIds(name, mlInfo, ledgersVersion, storeLedgersCb); store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, storeLedgersCb);
})); }));
}, null); }, null);
} }
...@@ -331,7 +331,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -331,7 +331,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
} }
store.getCursors(name, new MetaStoreCallback<List<String>>() { store.getCursors(name, new MetaStoreCallback<List<String>>() {
@Override @Override
public void operationComplete(List<String> consumers, Version v) { public void operationComplete(List<String> consumers, Stat s) {
// Load existing cursors // Load existing cursors
final AtomicInteger cursorCount = new AtomicInteger(consumers.size()); final AtomicInteger cursorCount = new AtomicInteger(consumers.size());
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
...@@ -621,7 +621,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -621,7 +621,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// ledger from BK) don't, we end up having a loose ledger leaked but the state will be consistent. // ledger from BK) don't, we end up having a loose ledger leaked but the state will be consistent.
store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new MetaStoreCallback<Void>() { store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat stat) {
cursor.asyncDeleteCursorLedger(); cursor.asyncDeleteCursorLedger();
cursors.removeCursor(consumerName); cursors.removeCursor(consumerName);
...@@ -951,13 +951,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -951,13 +951,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final MetaStoreCallback<Void> cb = new MetaStoreCallback<Void>() { final MetaStoreCallback<Void> cb = new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void v, Version version) { public void operationComplete(Void v, Stat stat) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Updating of ledgers list after create complete. version={}", name, version); log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
} }
ledgersVersion = version; ledgersStat = stat;
ledgersListMutex.unlock(); ledgersListMutex.unlock();
updateLedgersIdsComplete(version); updateLedgersIdsComplete(stat);
synchronized (ManagedLedgerImpl.this) { synchronized (ManagedLedgerImpl.this) {
mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp, mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
TimeUnit.NANOSECONDS); TimeUnit.NANOSECONDS);
...@@ -1013,12 +1013,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -1013,12 +1013,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()).build(); ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()).build();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersVersion); log.debug("[{}] Updating ledgers ids with new ledger. version={}", name, ledgersStat);
} }
store.asyncUpdateLedgerIds(name, mlInfo, ledgersVersion, callback); store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, callback);
} }
public synchronized void updateLedgersIdsComplete(Version version) { public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened); STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = System.currentTimeMillis(); lastLedgerCreatedTimestamp = System.currentTimeMillis();
...@@ -1436,12 +1436,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -1436,12 +1436,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] Updating of ledgers list after trimming", name); log.debug("[{}] Updating of ledgers list after trimming", name);
} }
ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()).build(); ManagedLedgerInfo mlInfo = ManagedLedgerInfo.newBuilder().addAllLedgerInfo(ledgers.values()).build();
store.asyncUpdateLedgerIds(name, mlInfo, ledgersVersion, new MetaStoreCallback<Void>() { store.asyncUpdateLedgerIds(name, mlInfo, ledgersStat, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(), log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this)); TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersVersion = version; ledgersStat = stat;
ledgersListMutex.unlock(); ledgersListMutex.unlock();
trimmerMutex.unlock(); trimmerMutex.unlock();
...@@ -1593,7 +1593,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { ...@@ -1593,7 +1593,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private void deleteMetadata(DeleteLedgerCallback callback, Object ctx) { private void deleteMetadata(DeleteLedgerCallback callback, Object ctx) {
store.removeManagedLedger(name, new MetaStoreCallback<Void>() { store.removeManagedLedger(name, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat stat) {
log.info("[{}] Successfully deleted managed ledger", name); log.info("[{}] Successfully deleted managed ledger", name);
factory.close(ManagedLedgerImpl.this); factory.close(ManagedLedgerImpl.this);
callback.deleteLedgerComplete(ctx); callback.deleteLedgerComplete(ctx);
......
...@@ -144,7 +144,7 @@ public class ManagedLedgerOfflineBacklog { ...@@ -144,7 +144,7 @@ public class ManagedLedgerOfflineBacklog {
store.getManagedLedgerInfo(managedLedgerName, store.getManagedLedgerInfo(managedLedgerName,
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() { new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
@Override @Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, MetaStore.Version version) { public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, MetaStore.Stat version) {
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) { for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
ledgers.put(ls.getLedgerId(), ls); ledgers.put(ls.getLedgerId(), ls);
} }
...@@ -227,7 +227,7 @@ public class ManagedLedgerOfflineBacklog { ...@@ -227,7 +227,7 @@ public class ManagedLedgerOfflineBacklog {
store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>() { store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>() {
@Override @Override
public void operationComplete(List<String> cursors, MetaStore.Version v) { public void operationComplete(List<String> cursors, MetaStore.Stat v) {
// Load existing cursors // Load existing cursors
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size()); log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size());
...@@ -333,7 +333,7 @@ public class ManagedLedgerOfflineBacklog { ...@@ -333,7 +333,7 @@ public class ManagedLedgerOfflineBacklog {
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() { new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
@Override @Override
public void operationComplete(MLDataFormats.ManagedCursorInfo info, public void operationComplete(MLDataFormats.ManagedCursorInfo info,
MetaStore.Version version) { MetaStore.Stat version) {
long cursorLedgerId = info.getCursorsLedgerId(); long cursorLedgerId = info.getCursorsLedgerId();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName, log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName,
......
...@@ -26,15 +26,18 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; ...@@ -26,15 +26,18 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
*/ */
public interface MetaStore { public interface MetaStore {
public static interface Version { public static interface Stat {
int getVersion();
long getCreationTimestamp();
long getModificationTimestamp();
} }
public static interface UpdateLedgersIdsCallback { public static interface UpdateLedgersIdsCallback {
void updateLedgersIdsComplete(MetaStoreException status, Version version); void updateLedgersIdsComplete(MetaStoreException status, Stat stat);
} }
public static interface MetaStoreCallback<T> { public static interface MetaStoreCallback<T> {
void operationComplete(T result, Version version); void operationComplete(T result, Stat stat);
void operationFailed(MetaStoreException e); void operationFailed(MetaStoreException e);
} }
...@@ -63,7 +66,7 @@ public interface MetaStore { ...@@ -63,7 +66,7 @@ public interface MetaStore {
* @param ctx * @param ctx
* opaque context object * opaque context object
*/ */
void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Version version, void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat version,
MetaStoreCallback<Void> callback); MetaStoreCallback<Void> callback);
/** /**
...@@ -98,7 +101,7 @@ public interface MetaStore { ...@@ -98,7 +101,7 @@ public interface MetaStore {
* the callback * the callback
* @throws MetaStoreException * @throws MetaStoreException
*/ */
void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Version version, void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorInfo info, Stat version,
MetaStoreCallback<Void> callback); MetaStoreCallback<Void> callback);
/** /**
......
...@@ -58,11 +58,36 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -58,11 +58,36 @@ public class MetaStoreImplZookeeper implements MetaStore {
private final ZNodeProtobufFormat protobufFormat; private final ZNodeProtobufFormat protobufFormat;
private final OrderedSafeExecutor executor; private final OrderedSafeExecutor executor;
private static class ZKVersion implements Version { private static class ZKStat implements Stat {
final int version; private final int version;
private final long creationTimestamp;
private final long modificationTimestamp;
ZKStat(org.apache.zookeeper.data.Stat stat) {
this.version = stat.getVersion();
this.creationTimestamp = stat.getCtime();
this.modificationTimestamp = stat.getMtime();
}
ZKStat() {
this.version = 0;
this.creationTimestamp = System.currentTimeMillis();
this.modificationTimestamp = System.currentTimeMillis();
}
@Override
public int getVersion() {
return version;
}
@Override
public long getCreationTimestamp() {
return creationTimestamp;
}
ZKVersion(int version) { @Override
this.version = version; public long getModificationTimestamp() {
return modificationTimestamp;
} }
} }
...@@ -113,7 +138,7 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -113,7 +138,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
try { try {
ManagedLedgerInfo info = parseManagedLedgerInfo(readData); ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
info = updateMLInfoTimestamp(info); info = updateMLInfoTimestamp(info);
callback.operationComplete(info, new ZKVersion(stat.getVersion())); callback.operationComplete(info, new ZKStat(stat));
} catch (ParseException | InvalidProtocolBufferException e) { } catch (ParseException | InvalidProtocolBufferException e) {
callback.operationFailed(new MetaStoreException(e)); callback.operationFailed(new MetaStoreException(e));
} }
...@@ -123,7 +148,7 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -123,7 +148,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
StringCallback createcb = (rc1, path1, ctx1, name) -> { StringCallback createcb = (rc1, path1, ctx1, name) -> {
if (rc1 == Code.OK.intValue()) { if (rc1 == Code.OK.intValue()) {
ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance(); ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
callback.operationComplete(info, new ZKVersion(0)); callback.operationComplete(info, new ZKStat());
} else { } else {
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc1)))); callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc1))));
} }
...@@ -138,20 +163,20 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -138,20 +163,20 @@ public class MetaStoreImplZookeeper implements MetaStore {
} }
@Override @Override
public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Version version, public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat stat,
final MetaStoreCallback<Void> callback) { final MetaStoreCallback<Void> callback) {
ZKVersion zkVersion = (ZKVersion) version; ZKStat zkStat = (ZKStat) stat;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Updating metadata version={} with content={}", ledgerName, zkVersion.version, mlInfo); log.debug("[{}] Updating metadata version={} with content={}", ledgerName, zkStat.version, mlInfo);
} }
byte[] serializedMlInfo = protobufFormat == ZNodeProtobufFormat.Text ? // byte[] serializedMlInfo = protobufFormat == ZNodeProtobufFormat.Text ? //
mlInfo.toString().getBytes(Encoding) : // Text format mlInfo.toString().getBytes(Encoding) : // Text format
mlInfo.toByteArray(); // Binary format mlInfo.toByteArray(); // Binary format
zk.setData(prefix + ledgerName, serializedMlInfo, zkVersion.version, zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(),
(rc, path, zkCtx, stat) -> executor.submit(safeRun(() -> { (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName, log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName,
Code.get(rc), stat != null ? stat.getVersion() : "null"); Code.get(rc), stat != null ? stat.getVersion() : "null");
...@@ -165,7 +190,7 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -165,7 +190,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
status = new MetaStoreException(KeeperException.create(Code.get(rc))); status = new MetaStoreException(KeeperException.create(Code.get(rc)));
callback.operationFailed(status); callback.operationFailed(status);
} else { } else {
callback.operationComplete(null, new ZKVersion(stat.getVersion())); callback.operationComplete(null, new ZKStat(stat1));
} }
})), null); })), null);
} }
...@@ -187,8 +212,7 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -187,8 +212,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Get childrend completed version={}", ledgerName, stat.getVersion()); log.debug("[{}] Get childrend completed version={}", ledgerName, stat.getVersion());
} }
ZKVersion version = new ZKVersion(stat.getVersion()); callback.operationComplete(children, new ZKStat(stat));
callback.operationComplete(children, version);
})), null); })), null);
} }
...@@ -206,7 +230,7 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -206,7 +230,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
} else { } else {
try { try {
ManagedCursorInfo info = parseManagedCursorInfo(data); ManagedCursorInfo info = parseManagedCursorInfo(data);
callback.operationComplete(info, new ZKVersion(stat.getVersion())); callback.operationComplete(info, new ZKStat(stat));
} catch (ParseException | InvalidProtocolBufferException e) { } catch (ParseException | InvalidProtocolBufferException e) {
callback.operationFailed(new MetaStoreException(e)); callback.operationFailed(new MetaStoreException(e));
} }
...@@ -220,7 +244,7 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -220,7 +244,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
@Override @Override
public void asyncUpdateCursorInfo(final String ledgerName, final String cursorName, final ManagedCursorInfo info, public void asyncUpdateCursorInfo(final String ledgerName, final String cursorName, final ManagedCursorInfo info,
Version version, final MetaStoreCallback<Void> callback) { Stat stat, final MetaStoreCallback<Void> callback) {
log.info("[{}] [{}] Updating cursor info ledgerId={} mark-delete={}:{}", ledgerName, cursorName, log.info("[{}] [{}] Updating cursor info ledgerId={} mark-delete={}:{}", ledgerName, cursorName,
info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId()); info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
...@@ -229,7 +253,7 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -229,7 +253,7 @@ public class MetaStoreImplZookeeper implements MetaStore {
info.toString().getBytes(Encoding) : // Text format info.toString().getBytes(Encoding) : // Text format
info.toByteArray(); // Binary format info.toByteArray(); // Binary format
if (version == null) { if (stat == null) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info); log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
} }
...@@ -244,21 +268,21 @@ public class MetaStoreImplZookeeper implements MetaStore { ...@@ -244,21 +268,21 @@ public class MetaStoreImplZookeeper implements MetaStore {
log.debug("[{}] Created consumer {} on meta-data store with {}", ledgerName, cursorName, log.debug("[{}] Created consumer {} on meta-data store with {}", ledgerName, cursorName,
info); info);
} }
callback.operationComplete(null, new ZKVersion(0)); callback.operationComplete(null, new ZKStat());
} }
})), null); })), null);
} else { } else {
ZKVersion zkVersion = (ZKVersion) version; ZKStat zkStat = (ZKStat) stat;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info); log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info);
} }
zk.setData(path, content, zkVersion.version, (rc, path1, ctx, stat) -> executor.submit(safeRun(() -> { zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> {
if (rc == Code.BADVERSION.intValue()) { if (rc == Code.BADVERSION.intValue()) {
callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc))));
} else if (rc != Code.OK.intValue()) { } else if (rc != Code.OK.intValue()) {
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
} else { } else {
callback.operationComplete(null, new ZKVersion(stat.getVersion())); callback.operationComplete(null, new ZKStat(stat1));
} }
})), null); })), null);
} }
......
...@@ -58,7 +58,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; ...@@ -58,7 +58,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Version; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.ZNodeProtobufFormat; import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.ZNodeProtobufFormat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
...@@ -1473,7 +1473,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ...@@ -1473,7 +1473,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
final MetaStore store = factory.getMetaStore(); final MetaStore store = factory.getMetaStore();
store.getManagedLedgerInfo("my_test_ledger", new MetaStoreCallback<ManagedLedgerInfo>() { store.getManagedLedgerInfo("my_test_ledger", new MetaStoreCallback<ManagedLedgerInfo>() {
@Override @Override
public void operationComplete(ManagedLedgerInfo result, Version version) { public void operationComplete(ManagedLedgerInfo result, Stat version) {
// Update the list // Update the list
ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result); ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result);
info.clearLedgerInfo(); info.clearLedgerInfo();
...@@ -1482,7 +1482,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ...@@ -1482,7 +1482,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
store.asyncUpdateLedgerIds("my_test_ledger", info.build(), version, new MetaStoreCallback<Void>() { store.asyncUpdateLedgerIds("my_test_ledger", info.build(), version, new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat version) {
counter.countDown(); counter.countDown();
} }
...@@ -1725,7 +1725,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ...@@ -1725,7 +1725,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
@Test @Test
public void testBackwardCompatiblityForMeta() throws Exception { public void testBackwardCompatiblityForMeta() throws Exception {
final ManagedLedgerInfo[] storedMLInfo = new ManagedLedgerInfo[3]; final ManagedLedgerInfo[] storedMLInfo = new ManagedLedgerInfo[3];
final Version[] versions = new Version[1]; final Stat[] versions = new Stat[1];
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ManagedLedgerConfig conf = new ManagedLedgerConfig(); ManagedLedgerConfig conf = new ManagedLedgerConfig();
...@@ -1745,7 +1745,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ...@@ -1745,7 +1745,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
// obtain the ledger info // obtain the ledger info
store.getManagedLedgerInfo("backward_test_ledger", new MetaStoreCallback<ManagedLedgerInfo>() { store.getManagedLedgerInfo("backward_test_ledger", new MetaStoreCallback<ManagedLedgerInfo>() {
@Override @Override
public void operationComplete(ManagedLedgerInfo result, Version version) { public void operationComplete(ManagedLedgerInfo result, Stat version) {
storedMLInfo[0] = result; storedMLInfo[0] = result;
versions[0] = version; versions[0] = version;
l1.countDown(); l1.countDown();
...@@ -1774,7 +1774,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ...@@ -1774,7 +1774,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
CountDownLatch l2 = new CountDownLatch(1); CountDownLatch l2 = new CountDownLatch(1);
store.asyncUpdateLedgerIds("backward_test_ledger", storedMLInfo[1], versions[0], new MetaStoreCallback<Void>() { store.asyncUpdateLedgerIds("backward_test_ledger", storedMLInfo[1], versions[0], new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat version) {
l2.countDown(); l2.countDown();
} }
......
...@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; ...@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Version; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
...@@ -58,7 +58,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -58,7 +58,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
store.removeManagedLedger("non-existing", new MetaStoreCallback<Void>() { store.removeManagedLedger("non-existing", new MetaStoreCallback<Void>() {
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat version) {
counter.countDown(); counter.countDown();
} }
...@@ -89,7 +89,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -89,7 +89,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
latch.countDown(); latch.countDown();
} }
public void operationComplete(ManagedLedgerInfo result, Version version) { public void operationComplete(ManagedLedgerInfo result, Stat version) {
fail("Operation should have failed"); fail("Operation should have failed");
} }
}); });
...@@ -113,7 +113,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -113,7 +113,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
latch.countDown(); latch.countDown();
} }
public void operationComplete(ManagedCursorInfo result, Version version) { public void operationComplete(ManagedCursorInfo result, Stat version) {
fail("Operation should have failed"); fail("Operation should have failed");
} }
}); });
...@@ -135,7 +135,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -135,7 +135,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
latch.countDown(); latch.countDown();
} }
public void operationComplete(ManagedLedgerInfo result, Version version) { public void operationComplete(ManagedLedgerInfo result, Stat version) {
fail("Operation should have failed"); fail("Operation should have failed");
} }
}); });
...@@ -157,7 +157,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -157,7 +157,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
fail("should have succeeded"); fail("should have succeeded");
} }
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat version) {
// Update again using the version // Update again using the version
zkc.failNow(Code.CONNECTIONLOSS); zkc.failNow(Code.CONNECTIONLOSS);
...@@ -169,7 +169,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -169,7 +169,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
} }
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat version) {
fail("should have failed"); fail("should have failed");
} }
}); });
...@@ -192,7 +192,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -192,7 +192,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
fail("should have succeeded"); fail("should have succeeded");
} }
public void operationComplete(ManagedLedgerInfo mlInfo, Version version) { public void operationComplete(ManagedLedgerInfo mlInfo, Stat version) {
// Update again using the version // Update again using the version
zkc.failNow(Code.BADVERSION); zkc.failNow(Code.BADVERSION);
...@@ -203,7 +203,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase { ...@@ -203,7 +203,7 @@ public class MetaStoreImplZookeeperTest extends MockedBookKeeperTestCase {
} }
@Override @Override
public void operationComplete(Void result, Version version) { public void operationComplete(Void result, Stat version) {
fail("should have failed"); fail("should have failed");
} }
}); });
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册