PulsarBrokerStatsClientTest.java 5.8 KB
Newer Older
M
Matteo Merli 已提交
1
/**
2 3 4 5 6 7 8
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
M
Matteo Merli 已提交
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
M
Matteo Merli 已提交
11
 *
12 13 14 15 16 17
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
M
Matteo Merli 已提交
18
 */
19
package org.apache.pulsar.stats.client;
M
Matteo Merli 已提交
20

21
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
22 23 24 25 26 27 28 29
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.admin.PulsarAdminException.ServerSideErrorException;
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
30 31 32 33 34 35
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
36 37
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
38 39
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
M
Matteo Merli 已提交
40 41
import org.testng.annotations.Test;

42 43 44 45 46 47 48 49 50
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.ServerErrorException;
import java.net.URL;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

51 52 53 54 55 56 57 58 59 60 61 62 63 64
public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {

    @BeforeMethod
    @Override
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @AfterMethod
    @Override
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }
M
Matteo Merli 已提交
65 66 67 68

    @Test
    public void testServiceException() throws Exception {
        URL url = new URL("http://localhost:15000");
69
        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(url.toString()).build();
70
        BrokerStatsImpl client = (BrokerStatsImpl) spy(admin.brokerStats());
M
Matteo Merli 已提交
71 72 73 74 75 76 77 78 79 80 81
        try {
            client.getLoadReport();
        } catch (PulsarAdminException e) {
            // Ok
        }
        try {
            client.getPendingBookieOpsStats();
        } catch (PulsarAdminException e) {
            // Ok
        }
        try {
82
            client.getBrokerResourceAvailability("prop/cluster/ns");
M
Matteo Merli 已提交
83 84 85 86 87 88 89 90 91 92 93
        } catch (PulsarAdminException e) {
            // Ok
        }
        assertTrue(client.getApiException(new ClientErrorException(403)) instanceof NotAuthorizedException);
        assertTrue(client.getApiException(new ClientErrorException(404)) instanceof NotFoundException);
        assertTrue(client.getApiException(new ClientErrorException(409)) instanceof ConflictException);
        assertTrue(client.getApiException(new ClientErrorException(412)) instanceof PreconditionFailedException);
        assertTrue(client.getApiException(new ClientErrorException(400)) instanceof PulsarAdminException);
        assertTrue(client.getApiException(new ServerErrorException(500)) instanceof ServerSideErrorException);
        assertTrue(client.getApiException(new ServerErrorException(503)) instanceof PulsarAdminException);

94 95 96
        log.info("Client: ", client);

        admin.close();
M
Matteo Merli 已提交
97 98
    }

99 100 101 102
    @Test
    public void testTopicInternalStats() throws Exception {
        log.info("-- Starting {} test --", methodName);

103
        final String topicName = "persistent://my-property/my-ns/my-topic1";
104
        final String subscriptionName = "my-subscriber-name";
105
        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
106
                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
107
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
108 109 110 111 112 113
        final int numberOfMsgs = 1000;
        for (int i = 0; i < numberOfMsgs; i++) {
            String message = "my-message-" + i;
            producer.send(message.getBytes());
        }

114
        Message<byte[]> msg;
115 116 117 118 119 120 121 122
        int count = 0;
        for (int i = 0; i < numberOfMsgs; i++) {
            msg = consumer.receive(5, TimeUnit.SECONDS);
            if (msg != null && count++ % 2 == 0) {
                consumer.acknowledge(msg);
            }
        }

123
        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
124 125 126
        PersistentTopicInternalStats internalStats = topic.getInternalStats();
        CursorStats cursor = internalStats.cursors.get(subscriptionName);
        assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);
127 128
        assertTrue(cursor.totalNonContiguousDeletedMessagesRange > 0
                && (cursor.totalNonContiguousDeletedMessagesRange) < numberOfMsgs / 2);
129

130 131 132 133 134
        producer.close();
        consumer.close();
        log.info("-- Exiting {} test --", methodName);
    }

135
    private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class);
M
Matteo Merli 已提交
136
}