提交 8c029a0d 编写于 作者: O odbozhou

1、Define constant 2、Refactoring pull message by the offset

上级 9707846a
......@@ -25,6 +25,7 @@ import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.producer.ProducerImpl;
import java.util.HashSet;
import java.util.Set;
......@@ -57,12 +58,12 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override public Consumer createConsumer() {
String consumerId = accessPointProperties.getString("CONSUMER_ID");
String consumerId = accessPointProperties.getString(NonStandardKeys.CONSUMER_ID);
String[] nsStrArr = consumerId.split("_");
if (nsStrArr.length < 2) {
return new PushConsumerImpl(accessPointProperties);
}
if ("pull".equals(nsStrArr[0])) {
if (NonStandardKeys.PULL_CONSUMER.equals(nsStrArr[0])) {
return new PullConsumerImpl(accessPointProperties);
}
return new PushConsumerImpl(accessPointProperties);
......@@ -82,24 +83,24 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
@Override
public void createNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", nsName);
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, nsName);
}
@Override
public void deleteNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", null);
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, null);
}
@Override
public void switchNamespace(String targetNamespace) {
accessPointProperties.put("CONSUMER_ID", targetNamespace);
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, targetNamespace);
}
@Override
public Set<String> listNamespaces() {
return new HashSet<String>() {
{
add(accessPointProperties.getString("CONSUMER_ID"));
add(accessPointProperties.getString(NonStandardKeys.CONSUMER_ID));
}
};
}
......
......@@ -23,6 +23,7 @@ import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
......@@ -103,8 +104,8 @@ class LocalMessageCache implements ServiceLifecycle {
MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOperationTimeout();
if (properties.containsKey("TIMEOUT")) {
currentPollTimeout = properties.getInt("TIMEOUT");
if (properties.containsKey(NonStandardKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(NonStandardKeys.TIMEOUT);
}
return poll(currentPollTimeout);
}
......
......@@ -31,6 +31,7 @@ import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList;
......@@ -55,6 +56,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PullConsumerImpl implements Consumer {
private static final int PULL_MAX_NUMS = 32;
private static final int PULL_MIN_NUMS = 1;
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
private boolean started = false;
......@@ -93,7 +98,7 @@ public class PullConsumerImpl implements Consumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put("CONSUMER_ID", consumerId);
properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
......@@ -111,9 +116,9 @@ public class PullConsumerImpl implements Consumer {
long offset = localMessageCache.nextPullOffset(mq);
PullResult pullResult = consumer.pull(mq, "*",
offset, localMessageCache.nextPullBatchNums());
offset, localMessageCache.nextPullBatchNums());
ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(mq);
.getProcessQueueTable().get(mq);
switch (pullResult.getPullStatus()) {
case FOUND:
if (pq != null) {
......@@ -229,54 +234,37 @@ public class PullConsumerImpl implements Consumer {
@Override
public Message receive(long timeout) {
KeyValue properties = new DefaultKeyValue();
properties.put("TIMEOUT", timeout);
properties.put(NonStandardKeys.TIMEOUT, timeout);
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@Override
public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
KeyValue properties = new DefaultKeyValue();
properties.put("QUEUE_NAME", queueName);
properties.put("PARTITION_ID", partitionId);
properties.put("RECEIPT_ID", receiptId);
properties.put("TIMEOUT", timeout);
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@Override
public List<Message> batchReceive(long timeout) {
KeyValue properties = new DefaultKeyValue();
properties.put("TIMEOUT", timeout);
List<MessageExt> rmqMsgs = localMessageCache.batchPoll(properties);
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
List<Message> messages = new ArrayList<>(rmqMsgs.size());
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
messages.add(bytesMessage);
MessageQueue mq = null;
mq = getQueue(queueName, partitionId, mq);
PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MIN_NUMS);
if (pullResult == null)
return null;
PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages.get(0);
}
return messages;
}
return null;
}
@Override
public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
MessageQueue mq = null;
try {
Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
for (MessageQueue messageQueue : messageQueues) {
if (messageQueue.getQueueId() == partitionId) {
mq = messageQueue;
}
}
} catch (MQClientException e) {
e.printStackTrace();
}
private PullResult getResult(long receiptId, long timeout, MessageQueue mq, int nums) {
PullResult pullResult;
try {
pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, 4 * 1024 * 1024, timeout);
pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, nums, timeout);
} catch (MQClientException e) {
log.error("A error occurred when pull message.", e);
return null;
......@@ -293,6 +281,46 @@ public class PullConsumerImpl implements Consumer {
if (null == pullResult) {
return null;
}
return pullResult;
}
private MessageQueue getQueue(String queueName, int partitionId, MessageQueue mq) {
try {
Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
for (MessageQueue messageQueue : messageQueues) {
if (messageQueue.getQueueId() == partitionId) {
mq = messageQueue;
}
}
} catch (MQClientException e) {
log.error("A error occurred when batch pull message.", e);
}
return mq;
}
@Override
public List<Message> batchReceive(long timeout) {
KeyValue properties = new DefaultKeyValue();
properties.put(NonStandardKeys.TIMEOUT, timeout);
List<MessageExt> rmqMsgs = localMessageCache.batchPoll(properties);
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
List<Message> messages = new ArrayList<>(rmqMsgs.size());
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages;
}
return null;
}
@Override
public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
MessageQueue mq = null;
mq = getQueue(queueName, partitionId, mq);
PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MAX_NUMS);
if (pullResult == null)
return null;
PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
......@@ -335,7 +363,6 @@ public class PullConsumerImpl implements Consumer {
}
}
this.started = true;
currentState = ServiceLifeState.STARTED;
}
@Override
......
......@@ -91,7 +91,7 @@ public class PushConsumerImpl implements Consumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put("CONSUMER_ID", consumerId);
properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
......
......@@ -29,4 +29,8 @@ public interface NonStandardKeys {
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
String PRODUCER_ID = "PRODUCER_ID";
String CONSUMER_ID = "CONSUMER_ID";
String TIMEOUT = "TIMEOUT";
String PULL_CONSUMER = "PULL";
String PUSH_CONSUMER = "PUSH";
}
......@@ -52,7 +52,7 @@ public class PullConsumerImplTest {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("pull_TestGroup");
resourceManager.createNamespace(NonStandardKeys.PULL_CONSUMER +"_TestGroup");
consumer = messagingAccessPoint.createConsumer();
consumer.bindQueue(queueName);
......
......@@ -49,7 +49,7 @@ public class PushConsumerImplTest {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("push_TestGroup");
resourceManager.createNamespace(NonStandardKeys.PUSH_CONSUMER + "_TestGroup");
consumer = messagingAccessPoint.createConsumer();
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册