Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
f5722415
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
f5722415
编写于
4月 12, 2017
作者:
R
Rajan
提交者:
Matteo Merli
4月 12, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Remove orphan ledger when managedCursor fails to switch ledger (#350)
上级
b5fd035d
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
86 addition
and
1 deletion
+86
-1
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
...org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+19
-1
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
...org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+67
-0
未找到文件。
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
浏览文件 @
f5722415
...
...
@@ -1842,7 +1842,25 @@ public class ManagedCursorImpl implements ManagedCursor {
log
.
debug
(
"[{}] Persisted position {} for cursor {}"
,
ledger
.
getName
(),
position
,
name
);
}
switchToNewLedger
(
lh
,
callback
);
switchToNewLedger
(
lh
,
new
VoidCallback
()
{
@Override
public
void
operationComplete
()
{
callback
.
operationComplete
();
}
@Override
public
void
operationFailed
(
ManagedLedgerException
exception
)
{
// it means it failed to switch the newly created ledger so, it should be
// deleted to prevent leak
bookkeeper
.
asyncDeleteLedger
(
lh
.
getId
(),
(
int
rc
,
Object
ctx
)
->
{
if
(
rc
!=
BKException
.
Code
.
OK
)
{
log
.
warn
(
"[{}] Failed to delete orphan ledger {}"
,
ledger
.
getName
(),
lh
.
getId
());
}
},
null
);
callback
.
operationFailed
(
exception
);
}
});
}
@Override
...
...
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
浏览文件 @
f5722415
...
...
@@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicReference;
import
java.util.stream.Collectors
;
import
org.apache.bookkeeper.client.BKException
;
import
org.apache.bookkeeper.client.BookKeeper.DigestType
;
import
org.apache.bookkeeper.client.LedgerHandle
;
import
org.apache.bookkeeper.mledger.AsyncCallbacks
;
import
org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback
;
import
org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback
;
...
...
@@ -47,6 +49,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import
org.apache.bookkeeper.mledger.Entry
;
import
org.apache.bookkeeper.mledger.ManagedCursor
;
import
org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries
;
import
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback
;
import
org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.ZNodeProtobufFormat
;
import
org.apache.bookkeeper.mledger.ManagedLedger
;
import
org.apache.bookkeeper.mledger.ManagedLedgerConfig
;
...
...
@@ -55,6 +58,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import
org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig
;
import
org.apache.bookkeeper.mledger.Position
;
import
org.apache.bookkeeper.test.MockedBookKeeperTestCase
;
import
org.apache.zookeeper.CreateMode
;
import
org.apache.zookeeper.ZooDefs
;
import
org.apache.zookeeper.data.Stat
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.testng.annotations.Factory
;
...
...
@@ -2376,5 +2382,66 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
factory2
.
shutdown
();
}
/**
* <pre>
* Verifies that {@link ManagedCursorImpl#createNewMetadataLedger()} cleans up orphan ledgers if fails to switch new
* ledger
* </pre>
* @throws Exception
*/
@Test
(
timeOut
=
5000
)
public
void
testLeakFailedLedgerOfManageCursor
()
throws
Exception
{
ManagedLedgerConfig
mlConfig
=
new
ManagedLedgerConfig
();
ManagedLedger
ledger
=
factory
.
open
(
"my_test_ledger"
,
mlConfig
);
ManagedCursorImpl
c1
=
(
ManagedCursorImpl
)
ledger
.
openCursor
(
"c1"
);
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
c1
.
createNewMetadataLedger
(
new
VoidCallback
()
{
@Override
public
void
operationComplete
()
{
latch
.
countDown
();
}
@Override
public
void
operationFailed
(
ManagedLedgerException
exception
)
{
latch
.
countDown
();
}
});
// update cursor-info with data which makes bad-version for existing managed-cursor
CountDownLatch
latch1
=
new
CountDownLatch
(
1
);
String
path
=
"/managed-ledgers/my_test_ledger/c1"
;
zkc
.
setData
(
path
,
""
.
getBytes
(),
-
1
,
(
rc
,
path1
,
ctx
,
stat
)
->
{
// updated path
latch1
.
countDown
();
},
null
);
latch1
.
await
();
// try to create ledger again which will fail because managedCursorInfo znode is already updated with different
// version so, this call will fail with BadVersionException
CountDownLatch
latch2
=
new
CountDownLatch
(
1
);
// create ledger will create ledgerId = 6
long
ledgerId
=
6
;
c1
.
createNewMetadataLedger
(
new
VoidCallback
()
{
@Override
public
void
operationComplete
()
{
latch2
.
countDown
();
}
@Override
public
void
operationFailed
(
ManagedLedgerException
exception
)
{
latch2
.
countDown
();
}
});
try
{
bkc
.
openLedgerNoRecovery
(
ledgerId
,
mlConfig
.
getDigestType
(),
mlConfig
.
getPassword
());
fail
(
"ledger should have deleted due to update-cursor failure"
);
}
catch
(
BKException
e
)
{
// ok
}
}
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
ManagedCursorTest
.
class
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录