diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java index a03d136212efc685514044621c99e009e09bae7c..b801c69c707ec740b98e9f84d4b05e616d36edb1 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java @@ -23,7 +23,6 @@ import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.util.ArrayList; import java.util.Map; import java.util.SortedMap; import org.apache.commons.lang3.StringUtils; @@ -210,9 +209,9 @@ public class AclUtils { // expand netaddress int separatorCount = StringUtils.countMatches(netaddress, ":"); int padCount = part - separatorCount; - if(padCount > 0){ + if (padCount > 0) { StringBuilder padStr = new StringBuilder(":"); - for(int i = 0; i < padCount; i++){ + for (int i = 0; i < padCount; i++) { padStr.append(":"); } netaddress = StringUtils.replace(netaddress, "::", padStr.toString()); @@ -221,7 +220,7 @@ public class AclUtils { // pad netaddress String[] strArray = StringUtils.splitPreserveAllTokens(netaddress, ":"); for (int i = 0; i < strArray.length; i++) { - if(strArray[i].length() < 4){ + if (strArray[i].length() < 4) { strArray[i] = StringUtils.leftPad(strArray[i], 4, '0'); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java index c6c706b4077f5c37fe666c0f72a847fa9125d68e..4bc63383954ec3c44db62b9b6398bb5dfba41c89 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java @@ -36,7 +36,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public class PullConsumer { public static void main(String[] args) throws MQClientException { - + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); Set topics = new HashSet<>(); @@ -46,99 +46,101 @@ public class PullConsumer { consumer.start(); ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { + @Override + public Thread newThread(Runnable r) { return new Thread(r, "PullConsumerThread"); } - }); - for(String topic : consumer.getRegisterTopics()){ - - executors.execute(new Runnable() { - - public void doSomething(List msgs){ - //do you business - System.out.println(msgs); - } - @Override - public void run() { - while(true){ - try { - Set messageQueues = consumer.fetchMessageQueuesInBalance(topic); - if(messageQueues == null || messageQueues.isEmpty()){ - Thread.sleep(1000); - continue; - } - PullResult pullResult = null; - for(MessageQueue messageQueue : messageQueues){ - try { - long offset = this.consumeFromOffset(messageQueue); - pullResult = consumer.pull(messageQueue, "*", offset, 32); - switch (pullResult.getPullStatus()) { - case FOUND: - List msgs = pullResult.getMsgFoundList(); - - if(msgs != null && !msgs.isEmpty()){ - this.doSomething(msgs); - //update offset to broker - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - //print pull tps - this.incPullTPS(topic, pullResult.getMsgFoundList().size()); - } - break; - case OFFSET_ILLEGAL: - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - break; - case NO_NEW_MSG: - Thread.sleep(1); - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - break; - case NO_MATCHED_MSG: - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - break; - default: - } - } catch (RemotingException e) { - e.printStackTrace(); - } catch (MQBrokerException e) { - e.printStackTrace(); - } catch (Exception e){ - e.printStackTrace(); - } - } - } catch (MQClientException e) { - //reblance error - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e){ - e.printStackTrace(); - } - } - } - - public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException{ - //-1 when started - long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if(offset < 0){ - //query from broker - offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); - } - if (offset < 0){ - //first time start from last offset - offset = consumer.maxOffset(messageQueue); + }); + for (String topic : consumer.getRegisterTopics()) { + + executors.execute(new Runnable() { + + public void doSomething(List msgs) { + //do you business + + } + + @Override + public void run() { + while (true) { + try { + Set messageQueues = consumer.fetchMessageQueuesInBalance(topic); + if (messageQueues == null || messageQueues.isEmpty()) { + Thread.sleep(1000); + continue; + } + PullResult pullResult = null; + for (MessageQueue messageQueue : messageQueues) { + try { + long offset = this.consumeFromOffset(messageQueue); + pullResult = consumer.pull(messageQueue, "*", offset, 32); + switch (pullResult.getPullStatus()) { + case FOUND: + List msgs = pullResult.getMsgFoundList(); + + if (msgs != null && !msgs.isEmpty()) { + this.doSomething(msgs); + //update offset to broker + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + //print pull tps + this.incPullTPS(topic, pullResult.getMsgFoundList().size()); + } + break; + case OFFSET_ILLEGAL: + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + break; + case NO_NEW_MSG: + Thread.sleep(1); + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + break; + case NO_MATCHED_MSG: + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + break; + default: + } + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } catch (MQClientException e) { + //reblance error + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException { + //-1 when started + long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); + if (offset < 0) { + //query from broker + offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); + } + if (offset < 0) { + //first time start from last offset + offset = consumer.maxOffset(messageQueue); } //make sure - if (offset < 0){ - offset = 0; + if (offset < 0) { + offset = 0; } - return offset; - } - public void incPullTPS(String topic, int pullSize) { - consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory() - .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize); - } - }); - + return offset; + } + + public void incPullTPS(String topic, int pullSize) { + consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory() + .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize); + } + }); + } // executors.shutdown(); // consumer.shutdown();