Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
7f6d36d6
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
7f6d36d6
编写于
3月 08, 2019
作者:
M
Matteo Merli
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fixed increasing consumer permits after ack dedup operation (#3787)
上级
399b6dad
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
139 addition
and
19 deletion
+139
-19
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
...apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
+124
-0
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
...main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+2
-5
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
...main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+13
-14
未找到文件。
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java
0 → 100644
浏览文件 @
7f6d36d6
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package
org.apache.pulsar.client.impl
;
import
static
org
.
testng
.
Assert
.
assertEquals
;
import
static
org
.
testng
.
Assert
.
assertNotNull
;
import
static
org
.
testng
.
Assert
.
assertNull
;
import
java.util.concurrent.TimeUnit
;
import
lombok.Cleanup
;
import
org.apache.pulsar.client.api.Consumer
;
import
org.apache.pulsar.client.api.Message
;
import
org.apache.pulsar.client.api.Producer
;
import
org.apache.pulsar.client.api.ProducerConsumerBase
;
import
org.apache.pulsar.client.api.Schema
;
import
org.testng.annotations.AfterClass
;
import
org.testng.annotations.BeforeClass
;
import
org.testng.annotations.DataProvider
;
import
org.testng.annotations.Test
;
public
class
ConsumerDedupPermitsUpdate
extends
ProducerConsumerBase
{
@BeforeClass
@Override
protected
void
setup
()
throws
Exception
{
super
.
internalSetup
();
producerBaseSetup
();
}
@AfterClass
@Override
protected
void
cleanup
()
throws
Exception
{
super
.
internalCleanup
();
}
@DataProvider
(
name
=
"combinations"
)
public
Object
[][]
combinations
()
{
return
new
Object
[][]
{
// batching-enabled - queue-size
{
false
,
0
},
{
false
,
1
},
{
false
,
10
},
{
false
,
100
},
{
true
,
1
},
{
true
,
10
},
{
true
,
100
},
};
}
@Test
(
timeOut
=
30000
,
dataProvider
=
"combinations"
)
public
void
testConsumerDedup
(
boolean
batchingEnabled
,
int
receiverQueueSize
)
throws
Exception
{
String
topic
=
"persistent://my-property/my-ns/my-topic-"
+
System
.
nanoTime
();
@Cleanup
Consumer
<
String
>
consumer
=
pulsarClient
.
newConsumer
(
Schema
.
STRING
)
.
topic
(
topic
)
.
subscriptionName
(
"test"
)
// Use high ack delay to simulate a message being tracked as dup
.
acknowledgmentGroupTime
(
1
,
TimeUnit
.
HOURS
)
.
receiverQueueSize
(
receiverQueueSize
)
.
subscribe
();
Producer
<
String
>
producer
=
pulsarClient
.
newProducer
(
Schema
.
STRING
)
.
topic
(
topic
)
.
enableBatching
(
batchingEnabled
)
.
batchingMaxMessages
(
10
)
.
batchingMaxPublishDelay
(
1
,
TimeUnit
.
HOURS
)
.
create
();
for
(
int
i
=
0
;
i
<
30
;
i
++)
{
producer
.
sendAsync
(
"hello-"
+
i
);
}
producer
.
flush
();
// Consumer receives and acks all the messages, though the acks
// are still cached in client lib
for
(
int
i
=
0
;
i
<
30
;
i
++)
{
Message
<
String
>
msg
=
consumer
.
receive
();
assertEquals
(
msg
.
getValue
(),
"hello-"
+
i
);
consumer
.
acknowledge
(
msg
);
}
// Trigger redelivery by unloading the topic.
admin
.
topics
().
unload
(
topic
);
// Consumer dedup logic will detect the dups and not bubble them up to the application
// (With zero-queue we cannot use receive with timeout)
if
(
receiverQueueSize
>
0
)
{
Message
<
String
>
msg
=
consumer
.
receive
(
100
,
TimeUnit
.
MILLISECONDS
);
assertNull
(
msg
);
}
// The flow permits in consumer shouldn't have been messed up by the deduping
// and we should be able to get new messages through
for
(
int
i
=
0
;
i
<
30
;
i
++)
{
producer
.
sendAsync
(
"new-message-"
+
i
);
}
producer
.
flush
();
for
(
int
i
=
0
;
i
<
30
;
i
++)
{
Message
<
String
>
msg
=
consumer
.
receive
();
assertEquals
(
msg
.
getValue
(),
"new-message-"
+
i
);
consumer
.
acknowledge
(
msg
);
}
}
}
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
浏览文件 @
7f6d36d6
...
...
@@ -73,11 +73,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this
.
subscribeFuture
=
subscribeFuture
;
this
.
listener
=
conf
.
getMessageListener
();
this
.
consumerEventListener
=
conf
.
getConsumerEventListener
();
if
(
receiverQueueSize
<=
1
)
{
this
.
incomingMessages
=
Queues
.
newArrayBlockingQueue
(
1
);
}
else
{
this
.
incomingMessages
=
new
GrowableArrayBlockingQueue
<>();
}
// Always use growable queue since items can exceed the advertised size
this
.
incomingMessages
=
new
GrowableArrayBlockingQueue
<>();
this
.
listenerExecutor
=
listenerExecutor
;
this
.
pendingReceives
=
Queues
.
newConcurrentLinkedQueue
();
...
...
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
浏览文件 @
7f6d36d6
...
...
@@ -778,18 +778,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
messageId
.
getEntryId
());
}
MessageIdImpl
msgId
=
new
MessageIdImpl
(
messageId
.
getLedgerId
(),
messageId
.
getEntryId
(),
getPartitionIndex
());
if
(
acknowledgmentsGroupingTracker
.
isDuplicate
(
msgId
))
{
if
(
log
.
isDebugEnabled
())
{
log
.
debug
(
"[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}"
,
topic
,
subscription
,
msgId
);
}
if
(
conf
.
getReceiverQueueSize
()
==
0
)
{
increaseAvailablePermits
(
cnx
);
}
return
;
}
MessageMetadata
msgMetadata
=
null
;
ByteBuf
payload
=
headersAndPayload
;
...
...
@@ -806,6 +794,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return
;
}
final
int
numMessages
=
msgMetadata
.
getNumMessagesInBatch
();
MessageIdImpl
msgId
=
new
MessageIdImpl
(
messageId
.
getLedgerId
(),
messageId
.
getEntryId
(),
getPartitionIndex
());
if
(
acknowledgmentsGroupingTracker
.
isDuplicate
(
msgId
))
{
if
(
log
.
isDebugEnabled
())
{
log
.
debug
(
"[{}][{}] Ignoring message as it was already being acked earlier by same consumer {}/{}"
,
topic
,
subscription
,
msgId
);
}
increaseAvailablePermits
(
cnx
,
numMessages
);
return
;
}
ByteBuf
decryptedPayload
=
decryptPayloadIfNeeded
(
messageId
,
msgMetadata
,
payload
,
cnx
);
boolean
isMessageUndecryptable
=
isMessageUndecryptable
(
msgMetadata
);
...
...
@@ -824,8 +825,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return
;
}
final
int
numMessages
=
msgMetadata
.
getNumMessagesInBatch
();
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if
(
isMessageUndecryptable
||
(
numMessages
==
1
&&
!
msgMetadata
.
hasNumMessagesInBatch
()))
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录