Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
8df2868b
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
8df2868b
编写于
10月 18, 2016
作者:
R
Rajan
提交者:
Matteo Merli
10月 18, 2016
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: flow permits on blocked consumer while redelivery of messages (#74)
上级
430eb5e0
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
207 addition
and
6 deletion
+207
-6
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java
...c/main/java/com/yahoo/pulsar/broker/service/Consumer.java
+26
-5
pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java
...m/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java
+181
-1
未找到文件。
pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java
浏览文件 @
8df2868b
...
...
@@ -23,7 +23,6 @@ import java.time.Instant;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.stream.Collectors
;
import
org.apache.bookkeeper.mledger.Entry
;
import
org.apache.bookkeeper.mledger.impl.PositionImpl
;
...
...
@@ -34,6 +33,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
com.google.common.base.Objects
;
import
com.google.common.collect.Lists
;
import
com.yahoo.pulsar.common.api.Commands
;
import
com.yahoo.pulsar.common.api.proto.PulsarApi
;
import
com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck
;
...
...
@@ -458,11 +458,32 @@ public class Consumer {
}
public
void
redeliverUnacknowledgedMessages
(
List
<
MessageIdData
>
messageIds
)
{
List
<
PositionImpl
>
pendingPositions
=
messageIds
.
stream
()
.
map
(
messageIdData
->
PositionImpl
.
get
(
messageIdData
.
getLedgerId
(),
messageIdData
.
getEntryId
()))
.
filter
(
position
->
pendingAcks
.
remove
(
position
)
!=
null
)
.
collect
(
Collectors
.
toList
());
int
totalRedeliveryMessages
=
0
;
List
<
PositionImpl
>
pendingPositions
=
Lists
.
newArrayList
();
for
(
MessageIdData
msg
:
messageIds
)
{
PositionImpl
position
=
PositionImpl
.
get
(
msg
.
getLedgerId
(),
msg
.
getEntryId
());
Integer
batchSize
=
pendingAcks
.
remove
(
position
);
if
(
batchSize
!=
null
)
{
totalRedeliveryMessages
+=
batchSize
;
pendingPositions
.
add
(
position
);
}
}
unackedMessages
.
addAndGet
(-
totalRedeliveryMessages
);
blockedConsumerOnUnackedMsgs
=
false
;
subscription
.
redeliverUnacknowledgedMessages
(
this
,
pendingPositions
);
int
numberOfBlockedPermits
=
Math
.
min
(
totalRedeliveryMessages
,
permitsReceivedWhileConsumerBlocked
.
get
());
// if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
if
(
numberOfBlockedPermits
>
0
)
{
permitsReceivedWhileConsumerBlocked
.
getAndAdd
(-
numberOfBlockedPermits
);
messagePermits
.
getAndAdd
(
numberOfBlockedPermits
);
subscription
.
consumerFlow
(
this
,
numberOfBlockedPermits
);
}
}
}
pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java
浏览文件 @
8df2868b
...
...
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
import
java.util.concurrent.Future
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.stream.Collectors
;
import
org.apache.bookkeeper.mledger.impl.EntryCacheImpl
;
import
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl
;
...
...
@@ -55,6 +56,7 @@ import com.google.common.collect.Lists;
import
com.google.common.collect.Sets
;
import
com.yahoo.pulsar.broker.service.persistent.PersistentTopic
;
import
com.yahoo.pulsar.client.impl.ConsumerImpl
;
import
com.yahoo.pulsar.client.impl.MessageIdImpl
;
import
com.yahoo.pulsar.client.util.FutureUtil
;
import
com.yahoo.pulsar.common.api.PulsarDecoder
;
...
...
@@ -1535,4 +1537,182 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
consumer
.
close
();
log
.
info
(
"-- Exiting {} test --"
,
methodName
);
}
/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
* Usecase: produce message with 10ms interval: so, consumer can consume only 10 messages without acking
*
* @throws Exception
*/
@Test
public
void
testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause
()
throws
Exception
{
log
.
info
(
"-- Starting {} test --"
,
methodName
);
int
unAckedMessages
=
pulsar
.
getConfiguration
().
getMaxUnackedMessagesPerConsumer
();
try
{
final
int
unAckedMessagesBufferSize
=
10
;
final
int
receiverQueueSize
=
20
;
final
int
totalProducedMsgs
=
20
;
pulsar
.
getConfiguration
().
setMaxUnackedMessagesPerConsumer
(
unAckedMessagesBufferSize
);
ConsumerConfiguration
conf
=
new
ConsumerConfiguration
();
conf
.
setReceiverQueueSize
(
receiverQueueSize
);
conf
.
setSubscriptionType
(
SubscriptionType
.
Shared
);
ConsumerImpl
consumer
=
(
ConsumerImpl
)
pulsarClient
.
subscribe
(
"persistent://my-property/use/my-ns/unacked-topic"
,
"subscriber-1"
,
conf
);
ProducerConfiguration
producerConf
=
new
ProducerConfiguration
();
Producer
producer
=
pulsarClient
.
createProducer
(
"persistent://my-property/use/my-ns/unacked-topic"
,
producerConf
);
// (1) Produced Messages
for
(
int
i
=
0
;
i
<
totalProducedMsgs
;
i
++)
{
String
message
=
"my-message-"
+
i
;
producer
.
send
(
message
.
getBytes
());
Thread
.
sleep
(
10
);
}
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message
msg
=
null
;
List
<
Message
>
messages1
=
Lists
.
newArrayList
();
for
(
int
i
=
0
;
i
<
totalProducedMsgs
;
i
++)
{
msg
=
consumer
.
receive
(
1
,
TimeUnit
.
SECONDS
);
if
(
msg
!=
null
)
{
messages1
.
add
(
msg
);
log
.
info
(
"Received message: "
+
new
String
(
msg
.
getData
()));
}
else
{
break
;
}
}
// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals
(
messages1
.
size
(),
unAckedMessagesBufferSize
);
Set
<
MessageIdImpl
>
redeliveryMessages
=
messages1
.
stream
().
map
(
m
->
{
return
(
MessageIdImpl
)
m
.
getMessageId
();
}).
collect
(
Collectors
.
toSet
());
// (3) redeliver all consumed messages
consumer
.
redeliverUnacknowledgedMessages
(
Lists
.
newArrayList
(
redeliveryMessages
));
Thread
.
sleep
(
1000
);
Set
<
MessageIdImpl
>
messages2
=
Sets
.
newHashSet
();
for
(
int
i
=
0
;
i
<
totalProducedMsgs
;
i
++)
{
msg
=
consumer
.
receive
(
1
,
TimeUnit
.
SECONDS
);
if
(
msg
!=
null
)
{
messages2
.
add
((
MessageIdImpl
)
msg
.
getMessageId
());
log
.
info
(
"Received message: "
+
new
String
(
msg
.
getData
()));
}
else
{
break
;
}
}
assertEquals
(
messages1
.
size
(),
messages2
.
size
());
// (4) Verify: redelivered all previous unacked-consumed messages
messages2
.
removeAll
(
redeliveryMessages
);
assertEquals
(
messages2
.
size
(),
0
);
producer
.
close
();
consumer
.
close
();
log
.
info
(
"-- Exiting {} test --"
,
methodName
);
}
catch
(
Exception
e
)
{
fail
();
}
finally
{
pulsar
.
getConfiguration
().
setMaxUnackedMessagesPerConsumer
(
unAckedMessages
);
}
}
/**
* It verifies that redelivery-of-specific messages: that redelivers all those messages even when consumer gets
* blocked due to unacked messsages
*
* Usecase: Consumer starts consuming only after all messages have been produced.
* So, consumer consumes total receiver-queue-size number messages => ask for redelivery and receives all messages again.
*
* @throws Exception
*/
@Test
(
invocationCount
=
10
)
public
void
testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhileProduce
()
throws
Exception
{
log
.
info
(
"-- Starting {} test --"
,
methodName
);
int
unAckedMessages
=
pulsar
.
getConfiguration
().
getMaxUnackedMessagesPerConsumer
();
try
{
final
int
unAckedMessagesBufferSize
=
10
;
final
int
receiverQueueSize
=
20
;
final
int
totalProducedMsgs
=
50
;
pulsar
.
getConfiguration
().
setMaxUnackedMessagesPerConsumer
(
unAckedMessagesBufferSize
);
ConsumerConfiguration
conf
=
new
ConsumerConfiguration
();
conf
.
setReceiverQueueSize
(
receiverQueueSize
);
conf
.
setSubscriptionType
(
SubscriptionType
.
Shared
);
// Only subscribe consumer
ConsumerImpl
consumer
=
(
ConsumerImpl
)
pulsarClient
.
subscribe
(
"persistent://my-property/use/my-ns/unacked-topic"
,
"subscriber-1"
,
conf
);
consumer
.
close
();
ProducerConfiguration
producerConf
=
new
ProducerConfiguration
();
Producer
producer
=
pulsarClient
.
createProducer
(
"persistent://my-property/use/my-ns/unacked-topic"
,
producerConf
);
// (1) Produced Messages
for
(
int
i
=
0
;
i
<
totalProducedMsgs
;
i
++)
{
String
message
=
"my-message-"
+
i
;
producer
.
send
(
message
.
getBytes
());
Thread
.
sleep
(
10
);
}
// (1.a) start consumer again
consumer
=
(
ConsumerImpl
)
pulsarClient
.
subscribe
(
"persistent://my-property/use/my-ns/unacked-topic"
,
"subscriber-1"
,
conf
);
// (2) try to consume messages: but will be able to consume number of messages = unAckedMessagesBufferSize
Message
msg
=
null
;
List
<
Message
>
messages1
=
Lists
.
newArrayList
();
for
(
int
i
=
0
;
i
<
totalProducedMsgs
;
i
++)
{
msg
=
consumer
.
receive
(
1
,
TimeUnit
.
SECONDS
);
if
(
msg
!=
null
)
{
messages1
.
add
(
msg
);
log
.
info
(
"Received message: "
+
new
String
(
msg
.
getData
()));
}
else
{
break
;
}
}
// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals
(
messages1
.
size
(),
receiverQueueSize
);
Set
<
MessageIdImpl
>
redeliveryMessages
=
messages1
.
stream
().
map
(
m
->
{
return
(
MessageIdImpl
)
m
.
getMessageId
();
}).
collect
(
Collectors
.
toSet
());
// (3) redeliver all consumed messages
consumer
.
redeliverUnacknowledgedMessages
(
Lists
.
newArrayList
(
redeliveryMessages
));
Thread
.
sleep
(
1000
);
Set
<
MessageIdImpl
>
messages2
=
Sets
.
newHashSet
();
for
(
int
i
=
0
;
i
<
totalProducedMsgs
;
i
++)
{
msg
=
consumer
.
receive
(
1
,
TimeUnit
.
SECONDS
);
if
(
msg
!=
null
)
{
messages2
.
add
((
MessageIdImpl
)
msg
.
getMessageId
());
log
.
info
(
"Received message: "
+
new
String
(
msg
.
getData
()));
}
else
{
break
;
}
}
assertEquals
(
messages1
.
size
(),
messages2
.
size
());
// (4) Verify: redelivered all previous unacked-consumed messages
messages2
.
removeAll
(
redeliveryMessages
);
assertEquals
(
messages2
.
size
(),
0
);
producer
.
close
();
consumer
.
close
();
log
.
info
(
"-- Exiting {} test --"
,
methodName
);
}
catch
(
Exception
e
)
{
fail
();
}
finally
{
pulsar
.
getConfiguration
().
setMaxUnackedMessagesPerConsumer
(
unAckedMessages
);
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录