ConsumerBase.java 13.1 KB
Newer Older
M
Matteo Merli 已提交
1
/**
2 3 4 5 6 7 8
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
M
Matteo Merli 已提交
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
M
Matteo Merli 已提交
11
 *
12 13 14 15 16 17
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
M
Matteo Merli 已提交
18
 */
19
package org.apache.pulsar.client.impl;
M
Matteo Merli 已提交
20

21
import com.google.common.collect.Queues;
22 23
import java.util.Collections;
import java.util.Map;
24
import java.util.Set;
M
Matteo Merli 已提交
25 26 27 28 29 30
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
31
import org.apache.pulsar.client.api.Consumer;
32
import org.apache.pulsar.client.api.ConsumerEventListener;
33 34 35 36
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
37
import org.apache.pulsar.client.api.Schema;
38
import org.apache.pulsar.client.api.SubscriptionType;
39
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
40
import org.apache.pulsar.client.util.ConsumerName;
41 42
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
43
import org.apache.pulsar.common.util.FutureUtil;
44
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
M
Matteo Merli 已提交
45

46
public abstract class ConsumerBase<T> extends HandlerBase implements Consumer<T> {
M
Matteo Merli 已提交
47 48 49 50 51 52

    enum ConsumerType {
        PARTITIONED, NON_PARTITIONED
    }

    protected final String subscription;
53
    protected final ConsumerConfigurationData<T> conf;
M
Matteo Merli 已提交
54
    protected final String consumerName;
55 56
    protected final CompletableFuture<Consumer<T>> subscribeFuture;
    protected final MessageListener<T> listener;
57
    protected final ConsumerEventListener consumerEventListener;
M
Matteo Merli 已提交
58
    protected final ExecutorService listenerExecutor;
59 60
    final BlockingQueue<Message<T>> incomingMessages;
    protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
61
    protected int maxReceiverQueueSize;
62
    protected Schema<T> schema;
M
Matteo Merli 已提交
63

64 65
    protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, int receiverQueueSize,
                           ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
66
        super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS));
67
        this.maxReceiverQueueSize = receiverQueueSize;
68
        this.subscription = conf.getSubscriptionName();
M
Matteo Merli 已提交
69
        this.conf = conf;
70
        this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
M
Matteo Merli 已提交
71 72
        this.subscribeFuture = subscribeFuture;
        this.listener = conf.getMessageListener();
73
        this.consumerEventListener = conf.getConsumerEventListener();
74
        if (receiverQueueSize <= 1) {
M
Matteo Merli 已提交
75 76
            this.incomingMessages = Queues.newArrayBlockingQueue(1);
        } else {
77
            this.incomingMessages = new GrowableArrayBlockingQueue<>();
M
Matteo Merli 已提交
78
        }
79

M
Matteo Merli 已提交
80 81
        this.listenerExecutor = listenerExecutor;
        this.pendingReceives = Queues.newConcurrentLinkedQueue();
82
        this.schema = schema;
M
Matteo Merli 已提交
83 84 85
    }

    @Override
86
    public Message<T> receive() throws PulsarClientException {
M
Matteo Merli 已提交
87 88 89 90 91
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set");
        }

92
        switch (getState()) {
M
Matteo Merli 已提交
93 94 95 96 97 98
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
            throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
99 100
        case Terminated:
            throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
M
Matteo Merli 已提交
101 102 103
        case Failed:
        case Uninitialized:
            throw new PulsarClientException.NotConnectedException();
104 105
        default:
            break;
M
Matteo Merli 已提交
106 107 108 109 110 111
        }

        return internalReceive();
    }

    @Override
112
    public CompletableFuture<Message<T>> receiveAsync() {
M
Matteo Merli 已提交
113 114

        if (listener != null) {
115
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
M
Matteo Merli 已提交
116 117 118
                    "Cannot use receive() when a listener has been set"));
        }

119
        switch (getState()) {
M
Matteo Merli 已提交
120 121 122 123 124
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
125
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer already closed"));
126 127
        case Terminated:
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Topic was terminated"));
M
Matteo Merli 已提交
128 129
        case Failed:
        case Uninitialized:
130
            return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
M
Matteo Merli 已提交
131 132 133 134 135
        }

        return internalReceiveAsync();
    }

136
    abstract protected Message<T> internalReceive() throws PulsarClientException;
M
Matteo Merli 已提交
137

138
    abstract protected CompletableFuture<Message<T>> internalReceiveAsync();
M
Matteo Merli 已提交
139 140

    @Override
141
    public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
M
Matteo Merli 已提交
142 143 144 145 146 147 148 149 150
        if (conf.getReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Can't use receive with timeout, if the queue size is 0");
        }
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set");
        }

151
        switch (getState()) {
M
Matteo Merli 已提交
152 153 154 155 156 157
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
            throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
158 159
        case Terminated:
            throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
M
Matteo Merli 已提交
160 161 162 163 164 165 166 167
        case Failed:
        case Uninitialized:
            throw new PulsarClientException.NotConnectedException();
        }

        return internalReceive(timeout, unit);
    }

168
    abstract protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException;
M
Matteo Merli 已提交
169 170

    @Override
171
    public void acknowledge(Message<?> message) throws PulsarClientException {
M
Matteo Merli 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
        try {
            acknowledge(message.getMessageId());
        } catch (NullPointerException npe) {
            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
        }
    }

    @Override
    public void acknowledge(MessageId messageId) throws PulsarClientException {
        try {
            acknowledgeAsync(messageId).get();
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException) t;
            } else {
                throw new PulsarClientException(t);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
197
    public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
M
Matteo Merli 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
        try {
            acknowledgeCumulative(message.getMessageId());
        } catch (NullPointerException npe) {
            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
        }
    }

    @Override
    public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
        try {
            acknowledgeCumulativeAsync(messageId).get();
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException) t;
            } else {
                throw new PulsarClientException(t);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
223
    public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
M
Matteo Merli 已提交
224 225 226 227 228 229 230 231
        try {
            return acknowledgeAsync(message.getMessageId());
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
232
    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
M
Matteo Merli 已提交
233 234 235 236 237 238 239 240 241
        try {
            return acknowledgeCumulativeAsync(message.getMessageId());
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
242
        return doAcknowledge(messageId, AckType.Individual, Collections.emptyMap());
M
Matteo Merli 已提交
243 244 245 246
    }

    @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
247
        if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
M
Matteo Merli 已提交
248 249 250 251
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
                    "Cannot use cumulative acks on a non-exclusive subscription"));
        }

252
        return doAcknowledge(messageId, AckType.Cumulative, Collections.emptyMap());
M
Matteo Merli 已提交
253 254
    }

255 256
    abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
                                                             Map<String,Long> properties);
M
Matteo Merli 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297

    @Override
    public void unsubscribe() throws PulsarClientException {
        try {
            unsubscribeAsync().get();
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException) t;
            } else {
                throw new PulsarClientException(t);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    abstract public CompletableFuture<Void> unsubscribeAsync();

    @Override
    public void close() throws PulsarClientException {
        try {
            closeAsync().get();
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof PulsarClientException) {
                throw (PulsarClientException) t;
            } else {
                throw new PulsarClientException(t);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PulsarClientException(e);
        }
    }

    @Override
    abstract public CompletableFuture<Void> closeAsync();

298 299 300 301
    private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
        return SubscriptionType.Shared != type;
    }

M
Matteo Merli 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
    protected SubType getSubType() {
        SubscriptionType type = conf.getSubscriptionType();
        switch (type) {
        case Exclusive:
            return SubType.Exclusive;

        case Shared:
            return SubType.Shared;

        case Failover:
            return SubType.Failover;
        }

        // Should not happen since we cover all cases above
        return null;
    }

    abstract public boolean isConnected();

321 322 323 324
    abstract public int getAvailablePermits();

    abstract public int numMessagesInQueue();

325
    public CompletableFuture<Consumer<T>> subscribeFuture() {
M
Matteo Merli 已提交
326 327 328 329 330 331 332 333 334 335 336 337
        return subscribeFuture;
    }

    @Override
    public String getTopic() {
        return topic;
    }

    @Override
    public String getSubscription() {
        return subscription;
    }
338 339 340 341 342 343 344

    /**
     * Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
     * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
     * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
     * breaks, the messages are redelivered after reconnect.
     */
345
    protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> messageIds);
346 347 348 349 350 351 352 353 354

    @Override
    public String toString() {
        return "ConsumerBase{" +
                "subscription='" + subscription + '\'' +
                ", consumerName='" + consumerName + '\'' +
                ", topic='" + topic + '\'' +
                '}';
    }
355 356 357 358 359

    protected void setMaxReceiverQueueSize(int newSize) {
        this.maxReceiverQueueSize = newSize;
    }

M
Matteo Merli 已提交
360
}