提交 9f7461cd 编写于 作者: M Matteo Merli

Mocked timeout tests failing on slower build machine

上级 f09d5798
......@@ -296,11 +296,12 @@ public class PersistentTopics extends AdminResource {
try {
// Write the new policies to zookeeper
globalZk().setData(path("policies", property, cluster, namespace), jsonMapper().writeValueAsBytes(policies),
nodeStat.getVersion());
String namespacePath = path("policies", property, cluster, namespace);
globalZk().setData(namespacePath, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
// invalidate the local cache to force update
policiesCache().invalidate(path("policies", property, cluster, namespace));
policiesCache().invalidate(namespacePath);
globalZkCache().invalidate(namespacePath);
log.info("[{}] Successfully revoke access for role {} - destination {}", clientAppId(), role,
destinationUri);
......
......@@ -279,7 +279,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
// unloads all namespaces gracefully without disrupting mutually
unloadNamespaceBundlesGracefully();
// close replication clients
replicationClients.forEach((cluster, client) -> {
try {
......@@ -307,12 +307,14 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
* <li>Second it starts unloading namespace bundle one by one without closing the connection in order to avoid
* disruption for other namespacebundles which are sharing the same connection from the same client.</li>
* <ul>
*
*
*/
public void unloadNamespaceBundlesGracefully() {
try {
// make broker-node unavailable from the cluster
pulsar.getLoadManager().disableBroker();
if (pulsar.getLoadManager() != null) {
pulsar.getLoadManager().disableBroker();
}
// unload all namespace-bundles gracefully
long closeTopicsStartTime = System.nanoTime();
......
......@@ -259,6 +259,7 @@ public class ServerCnx extends PulsarHandler {
exception.getCause().getMessage()));
}
consumers.remove(consumerId, consumerFuture);
return null;
});
......@@ -575,11 +576,11 @@ public class ServerCnx extends PulsarHandler {
long producerId = producer.getProducerId();
producers.remove(producerId);
if(remoteEndpointProtocolVersion >= v5.getNumber()) {
ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
} else {
close();
}
}
public void closeConsumer(Consumer consumer) {
......@@ -590,12 +591,12 @@ public class ServerCnx extends PulsarHandler {
long consumerId = consumer.consumerId();
consumers.remove(consumerId);
if(remoteEndpointProtocolVersion >= v5.getNumber()) {
ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
} else {
close();
}
}
/**
* It closes the connection with client which triggers {@code channelInactive()} which clears all producers and
* consumers from connection-map
......@@ -678,4 +679,8 @@ public class ServerCnx extends PulsarHandler {
public String getRole() {
return authRole;
}
boolean hasConsumer(long consumerId) {
return consumers.containsKey(consumerId);
}
}
......@@ -155,7 +155,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
}
@Test
public void clusters() throws PulsarAdminException {
public void clusters() throws Exception {
admin.clusters().createCluster("usw",
new ClusterData("http://broker.messaging.use.example.com" + ":" + BROKER_WEBSERVICE_PORT));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
......@@ -178,6 +178,8 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
"https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
admin.clusters().deleteCluster("usw");
Thread.sleep(300);
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use"));
admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
......
......@@ -238,7 +238,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
* Validation: 1. validates active-cursor after active subscription 2. validate active-cursor with subscription 3.
* unconsumed messages should be present into cache 4. cache and active-cursor should be empty once subscription is
* closed
*
*
* @throws Exception
*/
@Test
......@@ -1033,13 +1033,14 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
Consumer consumer = pulsarClient.subscribe(topicName, "my-sub");
Message msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build();
CompletableFuture<MessageId> future1 = producer.sendAsync(msg1);
// Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're checksums
// would have already been computed. If we change the message content at that point, it should result in a
// checksum validation error
stopBroker();
Message msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build();
CompletableFuture<MessageId> future1 = producer.sendAsync(msg1);
Message msg2 = MessageBuilder.create().setContent("message-2".getBytes()).build();
CompletableFuture<MessageId> future2 = producer.sendAsync(msg2);
......
......@@ -32,6 +32,7 @@ import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
......@@ -114,6 +115,9 @@ public class ServerCnxTest {
private final String nonExistentTopicName = "persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
private final String topicWithNonLocalCluster = "persistent://prop/usw/ns-abc/successTopic";
private ManagedLedger ledgerMock = mock(ManagedLedger.class);
private ManagedCursor cursorMock = mock(ManagedCursor.class);
@BeforeMethod
public void setup() throws Exception {
svcConfig = spy(new ServiceConfiguration());
......@@ -654,6 +658,19 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
// Delay the topic creation in a deterministic way
CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
topicCreationDelayLatch.await();
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create producer
// 2. close producer (when the timeout is triggered, which may be before the producer was created on the broker
......@@ -688,6 +705,9 @@ public class ServerCnxTest {
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 2);
// Now allow topic creation to complete
topicCreationDelayLatch.countDown();
// 1st producer it's not acked
// 2nd producer fails
......@@ -719,6 +739,31 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
// Delay the topic creation in a deterministic way
CountDownLatch failedTopicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
failedTopicCreationDelayLatch.await();
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
topicCreationDelayLatch.await();
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
// In a create producer timeout from client side we expect to see this sequence of commands :
// 1. create a failure producer which will timeout creation after 100msec
// 2. close producer
......@@ -741,6 +786,9 @@ public class ServerCnxTest {
producerName);
channel.writeInbound(createProducer2);
failedTopicCreationDelayLatch.countDown();
topicCreationDelayLatch.countDown();
// Close succeeds
Object response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
......@@ -752,7 +800,7 @@ public class ServerCnxTest {
assertEquals(((CommandError) response).getRequestId(), 3);
// Wait till the failtopic timeout interval
Thread.sleep(200);
Thread.sleep(500);
ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* producer id */, 4 /* request id */,
producerName);
channel.writeInbound(createProducer3);
......@@ -762,7 +810,7 @@ public class ServerCnxTest {
assertEquals(response.getClass(), CommandProducerSuccess.class);
assertEquals(((CommandProducerSuccess) response).getRequestId(), 4);
Thread.sleep(100);
Thread.sleep(500);
// We should not receive response for 1st producer, since it was cancelled by the close
assertTrue(channel.outboundMessages().isEmpty());
......@@ -776,6 +824,19 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
// Delay the topic creation in a deterministic way
CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
topicCreationDelayLatch.await();
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
// In a subscribe timeout from client side we expect to see this sequence of commands :
// 1. Subscribe
// 2. close consumer (when the timeout is triggered, which may be before the consumer was created on the broker)
......@@ -803,6 +864,8 @@ public class ServerCnxTest {
successSubName, 1 /* consumer id */, 5 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(subscribe4);
topicCreationDelayLatch.countDown();
Object response;
// Close succeeds
......@@ -837,6 +900,39 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
// Delay the topic creation in a deterministic way
CountDownLatch successTopicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
successTopicCreationDelayLatch.await();
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
CountDownLatch failedTopicCreationDelayLatch = new CountDownLatch(1);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
new Thread(() -> {
try {
failedTopicCreationDelayLatch.await();
} catch (InterruptedException e) {
}
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
}).start();
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), anyObject());
// In a subscribe timeout from client side we expect to see this sequence of commands :
// 1. Subscribe against failtopic which will fail after 100msec
// 2. close consumer
......@@ -845,7 +941,6 @@ public class ServerCnxTest {
// These operations need to be serialized, to allow the last subscribe operation to finally succeed
// (There can be more subscribe/close pairs in the sequence, depending on the client timeout
ByteBuf subscribe1 = Commands.newSubscribe(failTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(subscribe1);
......@@ -857,6 +952,9 @@ public class ServerCnxTest {
successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, "test" /* consumer name */);
channel.writeInbound(subscribe2);
successTopicCreationDelayLatch.countDown();
failedTopicCreationDelayLatch.countDown();
Object response;
// Close succeeds
......@@ -869,8 +967,9 @@ public class ServerCnxTest {
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 3);
// Wait till the failtopic timeout interval
Thread.sleep(200);
while (serverCnx.hasConsumer(1)) {
Thread.sleep(10);
}
ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, "test" /* consumer name */);
......@@ -1047,16 +1146,13 @@ public class ServerCnxTest {
}
private void setupMLAsyncCallbackMocks() {
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
final ManagedCursor cursorMock = mock(ManagedCursor.class);
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
// call openLedgerComplete with ledgerMock on ML factory asyncOpen
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Thread.sleep(100);
Thread.sleep(300);
((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
}
......@@ -1067,7 +1163,7 @@ public class ServerCnxTest {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Thread.sleep(100);
Thread.sleep(300);
new Thread(() -> {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
......@@ -1091,6 +1187,7 @@ public class ServerCnxTest {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Thread.sleep(300);
((OpenCursorCallback) invocationOnMock.getArguments()[1]).openCursorComplete(cursorMock, null);
return null;
}
......@@ -1099,6 +1196,7 @@ public class ServerCnxTest {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Thread.sleep(300);
((OpenCursorCallback) invocationOnMock.getArguments()[1])
.openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null);
return null;
......
......@@ -39,6 +39,7 @@ import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.PulsarClientException.LookupException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.ConsumerBase;
import com.yahoo.pulsar.client.impl.ProducerBase;
......@@ -374,7 +375,7 @@ public class ClientErrorsTest {
private void subscribeFailAfterRetryTimeout(String topic) throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setOperationTimeout(1, TimeUnit.SECONDS);
conf.setOperationTimeout(200, TimeUnit.MILLISECONDS);
PulsarClient client = PulsarClient.create("http://127.0.0.1:" + WEB_SERVICE_PORT, conf);
final AtomicInteger counter = new AtomicInteger(0);
......@@ -382,7 +383,7 @@ public class ClientErrorsTest {
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
if (counter.incrementAndGet() == 2) {
try {
Thread.sleep(2000);
Thread.sleep(500);
} catch (InterruptedException e) {
// do nothing
}
......@@ -393,11 +394,11 @@ public class ClientErrorsTest {
try {
ConsumerConfiguration cConf = new ConsumerConfiguration();
cConf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = client.subscribe(topic, "sub1", cConf);
client.subscribe(topic, "sub1", cConf);
fail("Should have failed");
} catch (Exception e) {
// we fail even on the retriable error
assertTrue(e instanceof PulsarClientException.LookupException);
assertEquals(e.getClass(), LookupException.class);
}
mockBrokerService.resetHandleSubscribe();
......@@ -576,6 +577,7 @@ public class ClientErrorsTest {
// close the cnx after creating the producer
channelCtx.get().channel().close();
Thread.sleep(300);
producer.send(new byte[0]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册