ConsumerBase.java 13.4 KB
Newer Older
M
Matteo Merli 已提交
1
/**
2 3 4 5 6 7 8
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
M
Matteo Merli 已提交
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
M
Matteo Merli 已提交
11
 *
12 13 14 15 16 17
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
M
Matteo Merli 已提交
18
 */
19
package org.apache.pulsar.client.impl;
M
Matteo Merli 已提交
20

21
import com.google.common.collect.Queues;
22 23
import java.util.Collections;
import java.util.Map;
24
import java.util.Set;
M
Matteo Merli 已提交
25 26 27 28 29 30
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
31
import org.apache.pulsar.client.api.Consumer;
32
import org.apache.pulsar.client.api.ConsumerEventListener;
33 34 35 36
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
37
import org.apache.pulsar.client.api.Schema;
38
import org.apache.pulsar.client.api.SubscriptionType;
39
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
40
import org.apache.pulsar.client.util.ConsumerName;
41 42
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
43
import org.apache.pulsar.common.util.FutureUtil;
44
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
M
Matteo Merli 已提交
45

46
public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
M
Matteo Merli 已提交
47 48 49 50 51 52

    enum ConsumerType {
        PARTITIONED, NON_PARTITIONED
    }

    protected final String subscription;
53
    protected final ConsumerConfigurationData<T> conf;
M
Matteo Merli 已提交
54
    protected final String consumerName;
55 56
    protected final CompletableFuture<Consumer<T>> subscribeFuture;
    protected final MessageListener<T> listener;
57
    protected final ConsumerEventListener consumerEventListener;
M
Matteo Merli 已提交
58
    protected final ExecutorService listenerExecutor;
59 60
    final BlockingQueue<Message<T>> incomingMessages;
    protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
61
    protected int maxReceiverQueueSize;
62
    protected final Schema<T> schema;
63
    protected final ConsumerInterceptors<T> interceptors;
M
Matteo Merli 已提交
64

65 66
    protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                           int receiverQueueSize, ExecutorService listenerExecutor,
67
                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
68
        super(client, topic);
69
        this.maxReceiverQueueSize = receiverQueueSize;
70
        this.subscription = conf.getSubscriptionName();
M
Matteo Merli 已提交
71
        this.conf = conf;
72
        this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
M
Matteo Merli 已提交
73 74
        this.subscribeFuture = subscribeFuture;
        this.listener = conf.getMessageListener();
75
        this.consumerEventListener = conf.getConsumerEventListener();
76 77
        // Always use growable queue since items can exceed the advertised size
        this.incomingMessages = new GrowableArrayBlockingQueue<>();
78

M
Matteo Merli 已提交
79 80
        this.listenerExecutor = listenerExecutor;
        this.pendingReceives = Queues.newConcurrentLinkedQueue();
81
        this.schema = schema;
82
        this.interceptors = interceptors;
M
Matteo Merli 已提交
83 84 85
    }

    @Override
86
    public Message<T> receive() throws PulsarClientException {
M
Matteo Merli 已提交
87 88 89 90 91
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set");
        }

92
        switch (getState()) {
M
Matteo Merli 已提交
93 94 95 96 97 98
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
            throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
99 100
        case Terminated:
            throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
M
Matteo Merli 已提交
101 102 103
        case Failed:
        case Uninitialized:
            throw new PulsarClientException.NotConnectedException();
104 105
        default:
            break;
M
Matteo Merli 已提交
106 107 108 109 110 111
        }

        return internalReceive();
    }

    @Override
112
    public CompletableFuture<Message<T>> receiveAsync() {
M
Matteo Merli 已提交
113 114

        if (listener != null) {
115
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
M
Matteo Merli 已提交
116 117 118
                    "Cannot use receive() when a listener has been set"));
        }

119
        switch (getState()) {
M
Matteo Merli 已提交
120 121 122 123 124
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
125
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer already closed"));
126 127
        case Terminated:
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Topic was terminated"));
M
Matteo Merli 已提交
128 129
        case Failed:
        case Uninitialized:
130
            return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
M
Matteo Merli 已提交
131 132 133 134 135
        }

        return internalReceiveAsync();
    }

136
    protected abstract Message<T> internalReceive() throws PulsarClientException;
M
Matteo Merli 已提交
137

138
    protected abstract CompletableFuture<Message<T>> internalReceiveAsync();
M
Matteo Merli 已提交
139 140

    @Override
141
    public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
M
Matteo Merli 已提交
142 143 144 145 146 147 148 149 150
        if (conf.getReceiverQueueSize() == 0) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Can't use receive with timeout, if the queue size is 0");
        }
        if (listener != null) {
            throw new PulsarClientException.InvalidConfigurationException(
                    "Cannot use receive() when a listener has been set");
        }

151
        switch (getState()) {
M
Matteo Merli 已提交
152 153 154 155 156 157
        case Ready:
        case Connecting:
            break; // Ok
        case Closing:
        case Closed:
            throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
158 159
        case Terminated:
            throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
M
Matteo Merli 已提交
160 161 162 163 164 165 166 167
        case Failed:
        case Uninitialized:
            throw new PulsarClientException.NotConnectedException();
        }

        return internalReceive(timeout, unit);
    }

168
    protected abstract Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException;
M
Matteo Merli 已提交
169 170

    @Override
171
    public void acknowledge(Message<?> message) throws PulsarClientException {
M
Matteo Merli 已提交
172 173 174 175 176 177 178 179 180 181 182
        try {
            acknowledge(message.getMessageId());
        } catch (NullPointerException npe) {
            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
        }
    }

    @Override
    public void acknowledge(MessageId messageId) throws PulsarClientException {
        try {
            acknowledgeAsync(messageId).get();
183 184
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
M
Matteo Merli 已提交
185 186 187 188
        }
    }

    @Override
189
    public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
M
Matteo Merli 已提交
190 191 192 193 194 195 196 197 198 199 200
        try {
            acknowledgeCumulative(message.getMessageId());
        } catch (NullPointerException npe) {
            throw new PulsarClientException.InvalidMessageException(npe.getMessage());
        }
    }

    @Override
    public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
        try {
            acknowledgeCumulativeAsync(messageId).get();
201 202
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
M
Matteo Merli 已提交
203 204 205 206
        }
    }

    @Override
207
    public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
M
Matteo Merli 已提交
208 209 210 211 212 213 214 215
        try {
            return acknowledgeAsync(message.getMessageId());
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
216
    public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
M
Matteo Merli 已提交
217 218 219 220 221 222 223 224 225
        try {
            return acknowledgeCumulativeAsync(message.getMessageId());
        } catch (NullPointerException npe) {
            return FutureUtil.failedFuture(new PulsarClientException.InvalidMessageException(npe.getMessage()));
        }
    }

    @Override
    public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
226
        return doAcknowledge(messageId, AckType.Individual, Collections.emptyMap());
M
Matteo Merli 已提交
227 228 229 230
    }

    @Override
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
231
        if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
M
Matteo Merli 已提交
232 233 234 235
            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
                    "Cannot use cumulative acks on a non-exclusive subscription"));
        }

236
        return doAcknowledge(messageId, AckType.Cumulative, Collections.emptyMap());
M
Matteo Merli 已提交
237 238
    }

239 240 241 242 243
    @Override
    public void negativeAcknowledge(Message<?> message) {
        negativeAcknowledge(message.getMessageId());
    }

244
    protected abstract CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
245
                                                             Map<String,Long> properties);
M
Matteo Merli 已提交
246 247 248 249 250

    @Override
    public void unsubscribe() throws PulsarClientException {
        try {
            unsubscribeAsync().get();
251 252
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
M
Matteo Merli 已提交
253 254 255 256
        }
    }

    @Override
257
    public abstract CompletableFuture<Void> unsubscribeAsync();
M
Matteo Merli 已提交
258 259 260 261 262

    @Override
    public void close() throws PulsarClientException {
        try {
            closeAsync().get();
263 264
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
M
Matteo Merli 已提交
265 266 267 268
        }
    }

    @Override
269 270 271 272 273 274 275 276 277 278 279 280 281 282
    public abstract CompletableFuture<Void> closeAsync();


    @Override
    public MessageId getLastMessageId() throws PulsarClientException {
        try {
            return getLastMessageIdAsync().get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    @Override
    public abstract CompletableFuture<MessageId> getLastMessageIdAsync();
M
Matteo Merli 已提交
283

284 285 286 287
    private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
        return SubscriptionType.Shared != type;
    }

M
Matteo Merli 已提交
288 289 290 291 292 293 294 295 296 297 298
    protected SubType getSubType() {
        SubscriptionType type = conf.getSubscriptionType();
        switch (type) {
        case Exclusive:
            return SubType.Exclusive;

        case Shared:
            return SubType.Shared;

        case Failover:
            return SubType.Failover;
299 300 301

        case Key_Shared:
            return SubType.Key_Shared;
M
Matteo Merli 已提交
302 303 304 305 306 307
        }

        // Should not happen since we cover all cases above
        return null;
    }

308
    public abstract int getAvailablePermits();
309

310
    public abstract int numMessagesInQueue();
311

312
    public CompletableFuture<Consumer<T>> subscribeFuture() {
M
Matteo Merli 已提交
313 314 315 316 317 318 319 320 321 322 323 324
        return subscribeFuture;
    }

    @Override
    public String getTopic() {
        return topic;
    }

    @Override
    public String getSubscription() {
        return subscription;
    }
325

326
    @Override
327 328 329 330
    public String getConsumerName() {
        return this.consumerName;
    }

331 332 333 334 335 336
    /**
     * Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
     * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
     * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
     * breaks, the messages are redelivered after reconnect.
     */
337
    protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> messageIds);
338 339 340 341 342 343 344 345 346

    @Override
    public String toString() {
        return "ConsumerBase{" +
                "subscription='" + subscription + '\'' +
                ", consumerName='" + consumerName + '\'' +
                ", topic='" + topic + '\'' +
                '}';
    }
347 348 349 350 351

    protected void setMaxReceiverQueueSize(int newSize) {
        this.maxReceiverQueueSize = newSize;
    }

352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
    protected Message<T> beforeConsume(Message<T> message) {
        if (interceptors != null) {
            return interceptors.beforeConsume(this, message);
        } else {
            return message;
        }
    }

    protected void onAcknowledge(MessageId messageId, Throwable exception) {
        if (interceptors != null) {
            interceptors.onAcknowledge(this, messageId, exception);
        }
    }

    protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) {
        if (interceptors != null) {
            interceptors.onAcknowledgeCumulative(this, messageId, exception);
        }
    }

372 373 374 375 376
    protected void onNegativeAcksSend(Set<MessageId> messageIds) {
        if (interceptors != null) {
            interceptors.onNegativeAcksSend(this, messageIds);
        }
    }
377 378 379 380 381 382

    protected void onAckTimeoutSend(Set<MessageId> messageIds) {
        if (interceptors != null) {
            interceptors. onAckTimeoutSend(this, messageIds);
        }
    }
M
Matteo Merli 已提交
383
}