提交 625ba077 编写于 作者: Y yukon

Add PullConsumer related implementation for OpenMessaging.

上级 a5ea4e45
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PullConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.println("messagingAccessPoint startup OK");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.startup();
System.out.println("consumer startup OK");
while (true) {
Message message = consumer.poll();
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.println("Received one message: " + msgId);
consumer.ack(msgId);
}
}
}
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
......@@ -47,7 +48,7 @@ public class SimplePushConsumer {
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.println("Received one message: " + message);
System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
......
......@@ -76,12 +76,12 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
@Override
public PullConsumer createPullConsumer(String queueName) {
return new PullConsumerImpl(accessPointProperties);
return new PullConsumerImpl(queueName, accessPointProperties);
}
@Override
public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
......
......@@ -25,7 +25,9 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll;
......@@ -88,7 +90,8 @@ public class OMSUtil {
}
omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
if (rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
} else {
omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
......@@ -131,4 +134,49 @@ public class OMSUtil {
}
return keyValue;
}
/**
* Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
*/
public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
return new Iterator<T>() {
Iterator<T> iterator = new Iterator<T>() {
@Override
public synchronized boolean hasNext() {
return false;
}
@Override
public synchronized T next() {
throw new NoSuchElementException();
}
@Override
public synchronized void remove() {
//Ignore
}
};
@Override
public synchronized boolean hasNext() {
return iterator.hasNext() || iterable.iterator().hasNext();
}
@Override
public synchronized T next() {
if (!iterator.hasNext()) {
iterator = iterable.iterator();
if (!iterator.hasNext()) {
throw new NoSuchElementException();
}
}
return iterator.next();
}
@Override
public synchronized void remove() {
iterator.remove();
}
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.PropertyKeys;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
class LocalMessageCache {
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
private final Map<String, ConsumeRequest> consumedRequest;
private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
private final DefaultMQPullConsumer rocketmqPullConsumer;
private int pullBatchNums = 32;
private int pollTimeout = -1;
private final static Logger log = ClientLogger.getLog();
LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties) {
int cacheCapacity = 1000;
if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) {
cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY);
}
consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity);
if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) {
pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS);
}
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
}
this.consumedRequest = new ConcurrentHashMap<>();
this.pullOffsetTable = new ConcurrentHashMap<>();
this.rocketmqPullConsumer = rocketmqPullConsumer;
}
int nextPullBatchNums() {
return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity());
}
long nextPullOffset(MessageQueue remoteQueue) {
if (!pullOffsetTable.containsKey(remoteQueue)) {
try {
pullOffsetTable.putIfAbsent(remoteQueue,
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
} catch (MQClientException e) {
log.error("A error occurred in fetch consume offset process.", e);
}
}
return pullOffsetTable.get(remoteQueue);
}
void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
pullOffsetTable.put(remoteQueue, nextPullOffset);
}
void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
} catch (InterruptedException ignore) {
}
}
MessageExt poll() {
try {
ConsumeRequest consumeRequest = consumeRequestCache.take();
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
return consumeRequest.getMessageExt();
} catch (InterruptedException ignore) {
}
return null;
}
MessageExt poll(final KeyValue properties) {
int currentPollTimeout = pollTimeout;
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
}
if (currentPollTimeout == -1) {
return poll();
}
try {
ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout, TimeUnit.MILLISECONDS);
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
return consumeRequest.getMessageExt();
} catch (InterruptedException ignore) {
}
return null;
}
void ack(final String messageId) {
ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
if (consumeRequest != null) {
long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
try {
rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
}
}
}
}
......@@ -18,45 +18,142 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.OMSUtil;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
public class PullConsumerImpl implements PullConsumer {
public PullConsumerImpl(final KeyValue properties) {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
private boolean started = false;
private String targetQueueName;
private final MQPullConsumerScheduleService pullConsumerScheduleService;
private final LocalMessageCache localMessageCache;
final static Logger log = ClientLogger.getLog();
public PullConsumerImpl(final String queueName, final KeyValue properties) {
this.properties = properties;
this.targetQueueName = queueName;
String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP);
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
}
pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);
this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES);
if (maxReDeliveryTimes != 0) {
this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
}
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties);
}
@Override
public KeyValue properties() {
return null;
return properties;
}
@Override
public Message poll() {
return null;
return OMSUtil.msgConvert(localMessageCache.poll());
}
@Override
public Message poll(final KeyValue properties) {
return null;
return OMSUtil.msgConvert(localMessageCache.poll(properties));
}
@Override
public void ack(final String messageId) {
localMessageCache.ack(messageId);
}
@Override
public void ack(final String messageId, final KeyValue properties) {
localMessageCache.ack(messageId);
}
@Override
public synchronized void startup() {
if (!started) {
try {
registerPullTaskCallback();
this.pullConsumerScheduleService.start();
} catch (MQClientException e) {
throw new OMSRuntimeException("-1", e);
}
}
this.started = true;
}
private void registerPullTaskCallback() {
this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
@Override
public void startup() {
public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = localMessageCache.nextPullOffset(mq);
PullResult pullResult = consumer.pull(mq, "*",
offset, localMessageCache.nextPullBatchNums());
ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(mq);
switch (pullResult.getPullStatus()) {
case FOUND:
if (pq != null) {
for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));
}
}
break;
default:
break;
}
localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
} catch (Exception e) {
log.error("A error occurred in pull message process.", e);
}
}
});
}
@Override
public void shutdown() {
public synchronized void shutdown() {
if (this.started) {
this.pullConsumerScheduleService.shutdown();
this.rocketmqPullConsumer.shutdown();
}
this.started = false;
}
}
......@@ -156,6 +156,8 @@ public class PushConsumerImpl implements PushConsumer {
final KeyValue contextProperties = OMS.newKeyValue();
final CountDownLatch sync = new CountDownLatch(1);
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
ReceivedMessageContext context = new ReceivedMessageContext() {
@Override
public KeyValue properties() {
......@@ -176,9 +178,13 @@ public class PushConsumerImpl implements PushConsumer {
properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
};
long begin = System.currentTimeMillis();
listener.onMessage(omsMsg, context);
long costs = System.currentTimeMillis() - begin;
try {
sync.await(PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout(), TimeUnit.MILLISECONDS);
sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout() - costs)
, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public class ConsumeRequest {
private final MessageExt messageExt;
private final MessageQueue messageQueue;
private final ProcessQueue processQueue;
private long startConsumeTimeMillis;
public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
final ProcessQueue processQueue) {
this.messageExt = messageExt;
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
public MessageExt getMessageExt() {
return messageExt;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public long getStartConsumeTimeMillis() {
return startConsumeTimeMillis;
}
public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
this.startConsumeTimeMillis = startConsumeTimeMillis;
}
}
......@@ -25,4 +25,6 @@ public interface NonStandardKeys {
String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums";
String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status";
String MESSAGE_DESTINATION = "rmq.message.destination";
String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册