diff --git a/.travis.yml b/.travis.yml index 5e6ca406db4d48069de604709b6fac044039e9e9..88058f322c943c67c5fa1edfeb30736a703ceb6d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,8 @@ matrix: # On Linux we install latest OpenJDK 1.8 from Ubuntu repositories - name: Linux x86_64 arch: amd64 +# - name: Linux aarch64 +# arch: arm64 cache: directories: @@ -41,6 +43,7 @@ before_script: - ulimit -c unlimited script: + - mvn verify - travis_retry mvn -B clean apache-rat:check - travis_retry mvn -B package jacoco:report coveralls:report diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index cd279b92d09ffb96356616f2ca1bce94c04e953f..af14b5ba00c5ad50775c9bc1616747f5ced57134 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -95,8 +95,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc return null; } String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()); - SendMessageContext mqtraceContext; - mqtraceContext = new SendMessageContext(); + SendMessageContext mqtraceContext = new SendMessageContext(); mqtraceContext.setProducerGroup(requestHeader.getProducerGroup()); mqtraceContext.setNamespace(namespace); mqtraceContext.setTopic(requestHeader.getTopic()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 692c98c5a36f9f1ec7f7074d25e516b7cef03d4a..91463a4937b5a5b263a0881c6cbfde72f60254a5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.remoting.netty.RemotingResponseCallback; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -186,7 +187,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; @@ -237,6 +238,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (correctTopic != null) { backTopic = correctTopic; } + if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) { + this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic()); + this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), r.getAppendMessageResult().getWroteBytes()); + this.brokerController.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId()); + this.brokerController.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), r.getAppendMessageResult().getWroteBytes()); + } this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); @@ -653,8 +660,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } private String diskUtil() { - String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + double physicRatio = 100; + String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog(); + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String storePathPhysic : paths) { + physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic)); + } String storePathLogis = StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java index b2c043ee7d97f3dedf22296b133fdd91fe93ddc8..8d7f5a136b76810a566bdf7ce189271ede2dd89d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java +++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java @@ -158,6 +158,7 @@ public class ClientConfig { this.useTLS = cc.useTLS; this.namespace = cc.namespace; this.language = cc.language; + this.mqClientApiTimeout = cc.mqClientApiTimeout; } public ClientConfig cloneClientConfig() { @@ -176,6 +177,7 @@ public class ClientConfig { cc.useTLS = useTLS; cc.namespace = namespace; cc.language = language; + cc.mqClientApiTimeout = mqClientApiTimeout; return cc; } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java index 03983413aa55ebbb490692b92e291b8e6b2ef092..236968e26b60f1ccde675d15233229eebffef413 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java @@ -47,7 +47,7 @@ public class MessageSelector { } /** - * Use SLQ92 to select message. + * Use SQL92 to select message. * * @param sql if null or empty, will be treated as select all message. */ diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 21387d17d84f6ddccd7b0d986e9dd3adc1eb4641..114fd0cf734b6dbcc43452fd5abe8fd265ebbd11 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -675,20 +675,13 @@ public class MQClientAPIImpl { retryBrokerName = mqChosen.getBrokerName(); } String addr = instance.findBrokerAddressInPublish(retryBrokerName); - log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - retryBrokerName); + log.warn(String.format("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, + retryBrokerName), e); try { request.setOpaque(RemotingCommand.createNewRequestId()); sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer); - } catch (InterruptedException e1) { - onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, false, producer); - } catch (RemotingConnectException e1) { - producer.updateFaultItem(brokerName, 3000, true); - onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, true, producer); - } catch (RemotingTooMuchRequestException e1) { + } catch (InterruptedException | RemotingTooMuchRequestException e1) { onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer); } catch (RemotingException e1) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index dafc4f8508722b01adc6bdea5fccd8fae988f3a5..874b1f8e7de06a3a55ffd65fd43fd7f78a459af9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -983,11 +983,7 @@ public class MQClientInstance { try { this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout()); log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); - } catch (RemotingException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (InterruptedException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (MQBrokerException e) { + } catch (RemotingException | InterruptedException | MQBrokerException e) { log.error("unregister client exception from broker: " + addr, e); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 046f27d23e81c5ed5cb5784b423ed28b7870887d..00df89c85bb8ffccd043bd159d79cb9416f905bd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -634,14 +634,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { default: break; } - } catch (RemotingException e) { - endTimestamp = System.currentTimeMillis(); - this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); - log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); - log.warn(msg.toString()); - exception = e; - continue; - } catch (MQClientException e) { + } catch (RemotingException | MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); @@ -936,19 +929,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { } return sendResult; - } catch (RemotingException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } catch (MQBrokerException e) { - if (this.hasSendMessageHook()) { - context.setException(e); - this.executeSendMessageHookAfter(context); - } - throw e; - } catch (InterruptedException e) { + } catch (RemotingException | MQBrokerException | InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java index be7f17d836e4eabbd979dd610ad68d87b736b4d4..8197f7a644898097e83f0291c29d443c33c718d4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java +++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java @@ -640,9 +640,6 @@ public class MQVersion { V4_9_9_SNAPSHOT, V4_9_9, - V5_0_0_PREVIEW_SNAPSHOT, - V5_0_0_PREVIEW, - V5_0_0_SNAPSHOT, V5_0_0, diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 457deb8dba8dcf347e1650aa0a11c359458eb56d..ea22aa7397f18a4899e05573f4696798ec330155 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; @@ -52,12 +51,11 @@ public class UtilAll { public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS"; public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss"; final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); + final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName(); // format: "pid@hostname" public static int getPid() { - RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); - String name = runtime.getName(); // format: "pid@hostname" try { - return Integer.parseInt(name.substring(0, name.indexOf('@'))); + return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@'))); } catch (Exception e) { return -1; } @@ -198,6 +196,11 @@ public class UtilAll { cal.get(Calendar.SECOND)); } + public static boolean isPathExists(final String path) { + File file = new File(path); + return file.exists(); + } + public static double getDiskPartitionSpaceUsedPercent(final String path) { if (null == path || path.isEmpty()) { log.error("Error when measuring disk space usage, path is null or empty, path : {}", path); diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java index 586689637b3c826b90da37c1675fc713eed2eaed..19346e6bca21316ba463734e069b573a361b5b14 100644 --- a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java @@ -28,4 +28,16 @@ public class RemotingUtilTest { assertThat(localAddress).isNotNull(); assertThat(localAddress.length()).isGreaterThan(0); } + + @Test + public void testConvert2IpStringWithIp() { + String result = RemotingUtil.convert2IpString("127.0.0.1:9876"); + assertThat(result).isEqualTo("127.0.0.1:9876"); + } + + @Test + public void testConvert2IpStringWithHost() { + String result = RemotingUtil.convert2IpString("localhost:9876"); + assertThat(result).isEqualTo("127.0.0.1:9876"); + } } diff --git a/distribution/bin/export.sh b/distribution/bin/export.sh new file mode 100644 index 0000000000000000000000000000000000000000..2b323e8b21d3d0a6b225abd5a10e5f749827c442 --- /dev/null +++ b/distribution/bin/export.sh @@ -0,0 +1,89 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ -z "$ROCKETMQ_HOME" ]; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ]; do + ls=$(ls -ld "$PRG") + link=$(expr "$ls" : '.*-> \(.*\)$') + if expr "$link" : '/.*' >/dev/null; then + PRG="$link" + else + PRG="$(dirname "$PRG")/$link" + fi + done + + saveddir=$(pwd) + + ROCKETMQ_HOME=$(dirname "$PRG")/.. + + # make it fully qualified + ROCKETMQ_HOME=$(cd "$ROCKETMQ_HOME" && pwd) + + cd "$saveddir" +fi + +export ROCKETMQ_HOME + +namesrvAddr= +while [ -z "${namesrvAddr}" ]; do + read -p "Enter name server address list:" namesrvAddr +done + +clusterName= +while [ -z "${clusterName}" ]; do + read -p "Choose a cluster to export:" clusterName +done + +read -p "Enter file path to export [default /tmp/rocketmq/export]:" filePath +if [ -z "${filePath}" ]; then + filePath="/tmp/rocketmq/config" +fi + +if [[ -e ${filePath} ]]; then + rm -rf ${filePath} +fi + +sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetrics -c ${clusterName} -n ${namesrvAddr} -f ${filePath} +sh ${ROCKETMQ_HOME}/bin/mqadmin exportConfigs -c ${clusterName} -n ${namesrvAddr} -f ${filePath} +sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetadata -c ${clusterName} -n ${namesrvAddr} -f ${filePath} + +cd ${filePath} || exit + +configs=$(cat ./configs.json) +if [ -z "$configs" ]; then + configs="{}" +fi +metadata=$(cat ./metadata.json) +if [ -z "$metadata" ]; then + metadata="{}" +fi +metrics=$(cat ./metrics.json) +if [ -z "$metrics" ]; then + metrics="{}" +fi + +echo "{ + \"configs\": ${configs}, + \"metadata\": ${metadata}, + \"metrics\": ${metrics} + }" >rocketmq-metadata-export.json + +echo -e "[INFO] The RocketMQ metadata has been exported to the file:${filePath}/rocketmq-metadata-export.json" diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md index bd0fc80aec57f53f982d6e37df06b5747ff6f308..e26bcdf043d4b826181e276410bb42dde67353c7 100644 --- a/docs/cn/RocketMQ_Example.md +++ b/docs/cn/RocketMQ_Example.md @@ -572,7 +572,7 @@ public class ListSplitter implements Iterator> { return currIndex; } private int calcMessageSize(Message message) { - int tmpSize = message.getTopic().length() + message.getBody().length(); + int tmpSize = message.getTopic().length() + message.getBody().length; Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); diff --git a/docs/en/Example_Batch.md b/docs/en/Example_Batch.md index 11bb42ffbe27b28046795892c30b7adcf14f8213..06461bb51d75ec02869453e735fb0d19558bb1f6 100644 --- a/docs/en/Example_Batch.md +++ b/docs/en/Example_Batch.md @@ -57,7 +57,7 @@ public class ListSplitter implements Iterator> { return currIndex; } private int calcMessageSize(Message message) { - int tmpSize = message.getTopic().length() + message.getBody().length(); + int tmpSize = message.getTopic().length() + message.getBody().length; Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); @@ -78,4 +78,4 @@ while (splitter.hasNext()) { // handle the error } } -``` \ No newline at end of file +``` diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java index c6c706b4077f5c37fe666c0f72a847fa9125d68e..4bc63383954ec3c44db62b9b6398bb5dfba41c89 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java @@ -36,7 +36,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public class PullConsumer { public static void main(String[] args) throws MQClientException { - + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); Set topics = new HashSet<>(); @@ -46,99 +46,101 @@ public class PullConsumer { consumer.start(); ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { + @Override + public Thread newThread(Runnable r) { return new Thread(r, "PullConsumerThread"); } - }); - for(String topic : consumer.getRegisterTopics()){ - - executors.execute(new Runnable() { - - public void doSomething(List msgs){ - //do you business - System.out.println(msgs); - } - @Override - public void run() { - while(true){ - try { - Set messageQueues = consumer.fetchMessageQueuesInBalance(topic); - if(messageQueues == null || messageQueues.isEmpty()){ - Thread.sleep(1000); - continue; - } - PullResult pullResult = null; - for(MessageQueue messageQueue : messageQueues){ - try { - long offset = this.consumeFromOffset(messageQueue); - pullResult = consumer.pull(messageQueue, "*", offset, 32); - switch (pullResult.getPullStatus()) { - case FOUND: - List msgs = pullResult.getMsgFoundList(); - - if(msgs != null && !msgs.isEmpty()){ - this.doSomething(msgs); - //update offset to broker - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - //print pull tps - this.incPullTPS(topic, pullResult.getMsgFoundList().size()); - } - break; - case OFFSET_ILLEGAL: - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - break; - case NO_NEW_MSG: - Thread.sleep(1); - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - break; - case NO_MATCHED_MSG: - consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); - break; - default: - } - } catch (RemotingException e) { - e.printStackTrace(); - } catch (MQBrokerException e) { - e.printStackTrace(); - } catch (Exception e){ - e.printStackTrace(); - } - } - } catch (MQClientException e) { - //reblance error - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (Exception e){ - e.printStackTrace(); - } - } - } - - public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException{ - //-1 when started - long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if(offset < 0){ - //query from broker - offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); - } - if (offset < 0){ - //first time start from last offset - offset = consumer.maxOffset(messageQueue); + }); + for (String topic : consumer.getRegisterTopics()) { + + executors.execute(new Runnable() { + + public void doSomething(List msgs) { + //do you business + + } + + @Override + public void run() { + while (true) { + try { + Set messageQueues = consumer.fetchMessageQueuesInBalance(topic); + if (messageQueues == null || messageQueues.isEmpty()) { + Thread.sleep(1000); + continue; + } + PullResult pullResult = null; + for (MessageQueue messageQueue : messageQueues) { + try { + long offset = this.consumeFromOffset(messageQueue); + pullResult = consumer.pull(messageQueue, "*", offset, 32); + switch (pullResult.getPullStatus()) { + case FOUND: + List msgs = pullResult.getMsgFoundList(); + + if (msgs != null && !msgs.isEmpty()) { + this.doSomething(msgs); + //update offset to broker + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + //print pull tps + this.incPullTPS(topic, pullResult.getMsgFoundList().size()); + } + break; + case OFFSET_ILLEGAL: + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + break; + case NO_NEW_MSG: + Thread.sleep(1); + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + break; + case NO_MATCHED_MSG: + consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); + break; + default: + } + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } catch (MQClientException e) { + //reblance error + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException { + //-1 when started + long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); + if (offset < 0) { + //query from broker + offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE); + } + if (offset < 0) { + //first time start from last offset + offset = consumer.maxOffset(messageQueue); } //make sure - if (offset < 0){ - offset = 0; + if (offset < 0) { + offset = 0; } - return offset; - } - public void incPullTPS(String topic, int pullSize) { - consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory() - .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize); - } - }); - + return offset; + } + + public void incPullTPS(String topic, int pullSize) { + consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory() + .getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize); + } + }); + } // executors.shutdown(); // consumer.shutdown(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index d936c3bf696ab0399a323ea028e9ee12ee46ce5a..3914314f0e2ad4768da0d3e8a9caf5e2de442e9b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -165,6 +165,10 @@ public class RemotingUtil { return sb.toString(); } + public static String convert2IpString(final String addr) { + return socketAddress2String(string2SocketAddress(addr)); + } + private static boolean isBridge(NetworkInterface networkInterface) { try { if (isLinuxPlatform()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index a234d129bf239586fcf195fad03c5f661976ea10..604d9c226c33496357f57c32bbf73138d63991b2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -22,10 +22,12 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -43,6 +45,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.schedule.ScheduleMessageService; @@ -71,9 +74,20 @@ public class CommitLog { protected final PutMessageLock putMessageLock; + private volatile Set fullStorePaths = Collections.emptySet(); + public CommitLog(final DefaultMessageStore defaultMessageStore) { - this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), - defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); + String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(); + if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) { + this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(), + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths); + } else { + this.mappedFileQueue = new MappedFileQueue(storePath, + defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), + defaultMessageStore.getAllocateMappedFileService()); + } + this.defaultMessageStore = defaultMessageStore; if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -95,6 +109,14 @@ public class CommitLog { } + public void setFullStorePaths(Set fullStorePaths) { + this.fullStorePaths = fullStorePaths; + } + + public Set getFullStorePaths() { + return fullStorePaths; + } + public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 152af7b091be4610f48689f6a19ee568c7bf33e4..0061369846fc9bed77646a63bf7aebfdd4220f2d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileLock; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -167,6 +168,8 @@ public class DefaultMessageStore implements MessageStore { File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); MappedFile.ensureDirOK(file.getParent()); + MappedFile.ensureDirOK(getStorePathPhysic()); + MappedFile.ensureDirOK(getStorePathLogic()); lockFile = new RandomAccessFile(file, "rw"); } @@ -190,10 +193,6 @@ public class DefaultMessageStore implements MessageStore { boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); - if (null != scheduleMessageService) { - result = result && this.scheduleMessageService.load(); - } - // load Commit Log result = result && this.commitLog.load(); @@ -209,7 +208,12 @@ public class DefaultMessageStore implements MessageStore { this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); + + if (null != scheduleMessageService) { + result = this.scheduleMessageService.load(); + } } + } catch (Exception e) { log.error("load exception", e); result = false; @@ -539,7 +543,8 @@ public class DefaultMessageStore implements MessageStore { long minOffset = 0; long maxOffset = 0; - GetMessageResult getResult = new GetMessageResult(); + // lazy init when find msg. + GetMessageResult getResult = null; final long maxOffsetPy = this.commitLog.getMaxOffset(); @@ -576,6 +581,9 @@ public class DefaultMessageStore implements MessageStore { int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); + + getResult = new GetMessageResult(maxMsgNums); + ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); @@ -679,6 +687,11 @@ public class DefaultMessageStore implements MessageStore { long elapsedTime = this.getSystemClock().now() - beginTime; this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime); + // lazy init no data found. + if (getResult == null) { + getResult = new GetMessageResult(0); + } + getResult.setStatus(status); getResult.setNextBeginOffset(nextBeginOffset); getResult.setMaxOffset(maxOffset); @@ -782,8 +795,8 @@ public class DefaultMessageStore implements MessageStore { return this.storeStatsService.toString(); } - private String getStorePathPhysic() { - String storePathPhysic = ""; + public String getStorePathPhysic() { + String storePathPhysic; if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog()) { storePathPhysic = ((DLedgerCommitLog)DefaultMessageStore.this.getCommitLog()).getdLedgerServer().getdLedgerConfig().getDataStorePath(); } else { @@ -792,20 +805,29 @@ public class DefaultMessageStore implements MessageStore { return storePathPhysic; } + public String getStorePathLogic() { + return StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); + } + @Override public HashMap getRuntimeInfo() { HashMap result = this.storeStatsService.getRuntimeInfo(); { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); - result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio)); - + double minPhysicsUsedRatio = Double.MAX_VALUE; + String commitLogStorePath = getStorePathPhysic(); + String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (String clPath : paths) { + double physicRatio = UtilAll.isPathExists(clPath) ? + UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1; + result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio)); + minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio); + } + result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio)); } { - - String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()); - double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); + double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathLogic()); result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio)); } @@ -1681,25 +1703,43 @@ public class DefaultMessageStore implements MessageStore { cleanImmediately = false; { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); - if (physicRatio > diskSpaceWarningLevelRatio) { + String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); + String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + Set fullStorePath = new HashSet<>(); + double minPhysicRatio = 100; + String minStorePath = null; + for (String storePathPhysic : storePaths) { + double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); + if (minPhysicRatio > physicRatio) { + minPhysicRatio = physicRatio; + minStorePath = storePathPhysic; + } + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(storePathPhysic); + } + } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + if (minPhysicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { - DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); + DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio + + ", so mark disk full, storePathPhysic=" + minStorePath); } cleanImmediately = true; - } else if (physicRatio > diskSpaceCleanForciblyRatio) { + } else if (minPhysicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { - DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); + DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + + ", so mark disk ok, storePathPhysic=" + minStorePath); } } - if (physicRatio < 0 || physicRatio > ratio) { - DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); + if (minPhysicRatio < 0 || minPhysicRatio > ratio) { + DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + + minPhysicRatio + ", storePathPhysic=" + minStorePath); return true; } } @@ -1740,8 +1780,27 @@ public class DefaultMessageStore implements MessageStore { public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; } + + public double calcStorePathPhysicRatio() { + Set fullStorePath = new HashSet<>(); + String storePath = getStorePathPhysic(); + String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + double minPhysicRatio = 100; + for (String path : paths) { + double physicRatio = UtilAll.isPathExists(path) ? + UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1; + minPhysicRatio = Math.min(minPhysicRatio, physicRatio); + if (physicRatio > diskSpaceCleanForciblyRatio) { + fullStorePath.add(path); + } + } + DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath); + return minPhysicRatio; + + } + public boolean isSpaceFull() { - double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); + double physicRatio = calcStorePathPhysicRatio(); double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; if (physicRatio > ratio) { DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java index 6fcb3101a139ac8d5b1ef96866288b8f513801ca..fafff6bee505888e9f8da36b3a2276ccf07b0151 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java @@ -23,11 +23,9 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; public class GetMessageResult { - private final List messageMapedList = - new ArrayList(100); - - private final List messageBufferList = new ArrayList(100); - private final List messageQueueOffset = new ArrayList<>(100); + private final List messageMapedList; + private final List messageBufferList; + private final List messageQueueOffset; private GetMessageStatus status; private long nextBeginOffset; @@ -41,6 +39,15 @@ public class GetMessageResult { private int msgCount4Commercial = 0; public GetMessageResult() { + messageMapedList = new ArrayList<>(100); + messageBufferList = new ArrayList<>(100); + messageQueueOffset = new ArrayList<>(100); + } + + public GetMessageResult(int resultSize) { + messageMapedList = new ArrayList<>(resultSize); + messageBufferList = new ArrayList<>(resultSize); + messageQueueOffset = new ArrayList<>(resultSize); } public GetMessageStatus getStatus() { diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index ac7894d2dc285f7889cc8fcd3513699e8c448fd2..1aa9ef83075b6961525b1c04e63e74c726b5f0a4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -37,13 +38,13 @@ public class MappedFileQueue { private final String storePath; - private final int mappedFileSize; + protected final int mappedFileSize; - private final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); + protected final CopyOnWriteArrayList mappedFiles = new CopyOnWriteArrayList(); private final AllocateMappedFileService allocateMappedFileService; - private long flushedWhere = 0; + protected long flushedWhere = 0; private long committedWhere = 0; private volatile long storeTimestamp = 0; @@ -144,35 +145,40 @@ public class MappedFileQueue { } } + public boolean load() { File dir = new File(this.storePath); - File[] files = dir.listFiles(); - if (files != null) { - // ascending order - Arrays.sort(files); - for (File file : files) { - - if (file.length() != this.mappedFileSize) { - log.warn(file + "\t" + file.length() - + " length not matched message store config value, please check it manually"); - return false; - } + File[] ls = dir.listFiles(); + if (ls != null) { + return doLoad(Arrays.asList(ls)); + } + return true; + } - try { - MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); - - mappedFile.setWrotePosition(this.mappedFileSize); - mappedFile.setFlushedPosition(this.mappedFileSize); - mappedFile.setCommittedPosition(this.mappedFileSize); - this.mappedFiles.add(mappedFile); - log.info("load " + file.getPath() + " OK"); - } catch (IOException e) { - log.error("load file " + file + " error", e); - return false; - } + public boolean doLoad(List files) { + // ascending order + files.sort(Comparator.comparing(File::getName)); + + for (File file : files) { + if (file.length() != this.mappedFileSize) { + log.warn(file + "\t" + file.length() + + " length not matched message store config value, ignore it"); + return true; } - } + try { + MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); + + mappedFile.setWrotePosition(this.mappedFileSize); + mappedFile.setFlushedPosition(this.mappedFileSize); + mappedFile.setCommittedPosition(this.mappedFileSize); + this.mappedFiles.add(mappedFile); + log.info("load " + file.getPath() + " OK"); + } catch (IOException e) { + log.error("load file " + file + " error", e); + return false; + } + } return true; } @@ -204,33 +210,41 @@ public class MappedFileQueue { } if (createOffset != -1 && needCreate) { - String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); - String nextNextFilePath = this.storePath + File.separator - + UtilAll.offset2FileName(createOffset + this.mappedFileSize); - MappedFile mappedFile = null; + return tryCreateMappedFile(createOffset); + } - if (this.allocateMappedFileService != null) { - mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, + return mappedFileLast; + } + + protected MappedFile tryCreateMappedFile(long createOffset) { + String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset + + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } + + protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) { + MappedFile mappedFile = null; + + if (this.allocateMappedFileService != null) { + mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this.mappedFileSize); - } else { - try { - mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); - } catch (IOException e) { - log.error("create mappedFile exception", e); - } + } else { + try { + mappedFile = new MappedFile(nextFilePath, this.mappedFileSize); + } catch (IOException e) { + log.error("create mappedFile exception", e); } + } - if (mappedFile != null) { - if (this.mappedFiles.isEmpty()) { - mappedFile.setFirstCreateInQueue(true); - } - this.mappedFiles.add(mappedFile); + if (mappedFile != null) { + if (this.mappedFiles.isEmpty()) { + mappedFile.setFirstCreateInQueue(true); } - - return mappedFile; + this.mappedFiles.add(mappedFile); } - return mappedFileLast; + return mappedFile; } public MappedFile getLastMappedFile(final long startOffset) { diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..669698ff4e99c5e1e5b43a4baefa3ac70213407a --- /dev/null +++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store; + + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class MultiPathMappedFileQueue extends MappedFileQueue { + + private final MessageStoreConfig config; + private final Supplier> fullStorePathsSupplier; + + public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize, + AllocateMappedFileService allocateMappedFileService, + Supplier> fullStorePathsSupplier) { + super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService); + this.config = messageStoreConfig; + this.fullStorePathsSupplier = fullStorePathsSupplier; + } + + private Set getPaths() { + String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + return new HashSet<>(Arrays.asList(paths)); + } + + private Set getReadonlyPaths() { + String pathStr = config.getReadOnlyCommitLogStorePaths(); + if (StringUtils.isBlank(pathStr)) { + return Collections.emptySet(); + } + String[] paths = pathStr.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + return new HashSet<>(Arrays.asList(paths)); + } + + @Override + public boolean load() { + Set storePathSet = getPaths(); + storePathSet.addAll(getReadonlyPaths()); + + List files = new ArrayList<>(); + for (String path : storePathSet) { + File dir = new File(path); + File[] ls = dir.listFiles(); + if (ls != null) { + Collections.addAll(files, ls); + } + } + + return doLoad(files); + } + + @Override + protected MappedFile tryCreateMappedFile(long createOffset) { + long fileIdx = createOffset / this.mappedFileSize; + Set storePath = getPaths(); + Set readonlyPathSet = getReadonlyPaths(); + Set fullStorePaths = + fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get(); + + + HashSet availableStorePath = new HashSet<>(storePath); + //do not create file in readonly store path. + availableStorePath.removeAll(readonlyPathSet); + + //do not create file is space is nearly full. + availableStorePath.removeAll(fullStorePaths); + + //if no store path left, fall back to writable store path. + if (availableStorePath.isEmpty()) { + availableStorePath = new HashSet<>(storePath); + availableStorePath.removeAll(readonlyPathSet); + } + + String[] paths = availableStorePath.toArray(new String[]{}); + Arrays.sort(paths); + String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator + + UtilAll.offset2FileName(createOffset); + String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator + + UtilAll.offset2FileName(createOffset + this.mappedFileSize); + return doCreateMappedFile(nextFilePath, nextNextFilePath); + } + + @Override + public void destroy() { + for (MappedFile mf : this.mappedFiles) { + mf.destroy(1000 * 3); + } + this.mappedFiles.clear(); + this.flushedWhere = 0; + + + Set storePathSet = getPaths(); + storePathSet.addAll(getReadonlyPaths()); + + for (String path : storePathSet) { + File file = new File(path); + if (file.isDirectory()) { + file.delete(); + } + } + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 848fe22decdb211064003ae1e7c6b7cf832da55d..e1439a0c9d8260be9c7e9fe949b364726d9f84b2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -17,10 +17,14 @@ package org.apache.rocketmq.store.config; import java.io.File; + import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.store.ConsumeQueue; public class MessageStoreConfig { + + public static final String MULTI_PATH_SPLITTER = System.getProperty("rocketmq.broker.multiPathSplitter", ","); + //The root directory in which the log data is kept @ImportantField private String storePathRootDir = System.getProperty("user.home") + File.separator + "store"; @@ -30,6 +34,8 @@ public class MessageStoreConfig { private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog"; + private String readOnlyCommitLogStorePaths = null; + // CommitLog file size,default is 1G private int mappedFileSizeCommitLog = 1024 * 1024 * 1024; // ConsumeQueue file size,default is 30W @@ -676,6 +682,13 @@ public class MessageStoreConfig { this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval; } + public String getReadOnlyCommitLogStorePaths() { + return readOnlyCommitLogStorePaths; + } + + public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) { + this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths; + } public String getdLegerGroup() { return dLegerGroup; } diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index c127515ee224813a61640343472b0519e2591599..62b0b9ef8a15d006bab5ef81f68d437c34f0be3b 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -81,8 +81,7 @@ public class ScheduleMessageService extends ConfigManager { } /** - * @param writeMessageStore - * the writeMessageStore to set + * @param writeMessageStore the writeMessageStore to set */ public void setWriteMessageStore(MessageStore writeMessageStore) { this.writeMessageStore = writeMessageStore; @@ -136,7 +135,9 @@ public class ScheduleMessageService extends ConfigManager { @Override public void run() { try { - if (started.get()) ScheduleMessageService.this.persist(); + if (started.get()) { + ScheduleMessageService.this.persist(); + } } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } @@ -168,9 +169,46 @@ public class ScheduleMessageService extends ConfigManager { public boolean load() { boolean result = super.load(); result = result && this.parseDelayLevel(); + result = result && this.correctDelayOffset(); return result; } + public boolean correctDelayOffset() { + try { + for (int delayLevel : delayLevelTable.keySet()) { + ConsumeQueue cq = + ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, + delayLevel2QueueId(delayLevel)); + Long currentDelayOffset = offsetTable.get(delayLevel); + if (currentDelayOffset == null || cq == null) { + continue; + } + long correctDelayOffset = currentDelayOffset; + long cqMinOffset = cq.getMinOffsetInQueue(); + long cqMaxOffset = cq.getMaxOffsetInQueue(); + if (currentDelayOffset < cqMinOffset) { + correctDelayOffset = cqMinOffset; + log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", + currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId()); + } + + if (currentDelayOffset > cqMaxOffset) { + correctDelayOffset = cqMaxOffset; + log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", + currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId()); + } + if (correctDelayOffset != currentDelayOffset) { + log.error("correct delay offset [ delayLevel {} ] from {} to {}", delayLevel, currentDelayOffset, correctDelayOffset); + offsetTable.put(delayLevel, correctDelayOffset); + } + } + } catch (Exception e) { + log.error("correctDelayOffset exception", e); + return false; + } + return true; + } + @Override public String configFilePath() { return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig() @@ -323,7 +361,7 @@ public class ScheduleMessageService extends ConfigManager { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", - msgInner.getTopic(), msgInner); + msgInner.getTopic(), msgInner); continue; } PutMessageResult putMessageResult = @@ -358,14 +396,9 @@ public class ScheduleMessageService extends ConfigManager { } catch (Exception e) { /* * XXX: warn and notify me - - - */ log.error( - "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" - + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" - + offsetPy + ",sizePy=" + sizePy, e); + "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e); } } } else { @@ -390,10 +423,17 @@ public class ScheduleMessageService extends ConfigManager { else { long cqMinOffset = cq.getMinOffsetInQueue(); + long cqMaxOffset = cq.getMaxOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; - log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" - + cqMinOffset + ", queueId=" + cq.getQueueId()); + log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", + offset, cqMinOffset, cqMaxOffset, cq.getQueueId()); + } + + if (offset > cqMaxOffset) { + failScheduleOffset = cqMaxOffset; + log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}", + offset, cqMinOffset, cqMaxOffset, cq.getQueueId()); } } } // end of if (cq != null) diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java index 7455dfea3c811381d86a7e733aea0394479eec7e..69c1673ee48b792b7b63513297a4e851ff2a7867 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java @@ -94,6 +94,41 @@ public class DefaultMessageStoreCleanFilesTest { } + @Test + public void testIsSpaceFullMultiCommitLogStorePath() throws Exception { + String deleteWhen = "04"; + // the min value of diskMaxUsedSpaceRatio. + int diskMaxUsedSpaceRatio = 1; + // used to set disk-full flag + double diskSpaceCleanForciblyRatio = 0.01D; + MessageStoreConfig config = genMessageStoreConfig(deleteWhen, diskMaxUsedSpaceRatio); + String storePath = config.getStorePathCommitLog(); + StringBuilder storePathBuilder = new StringBuilder(); + for (int i = 0; i < 3; i++) { + storePathBuilder.append(storePath).append(i).append(MessageStoreConfig.MULTI_PATH_SPLITTER); + } + config.setStorePathCommitLog(storePathBuilder.toString()); + String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + assertEquals(3, paths.length); + initMessageStore(config, diskSpaceCleanForciblyRatio); + + + + // build and put 55 messages, exactly one message per CommitLog file. + buildAndPutMessagesToMessageStore(msgCount); + MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog(); + assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size()); + int fileCountConsumeQueue = getFileCountConsumeQueue(); + MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue(); + assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size()); + cleanCommitLogService.isSpaceFull(); + + assertEquals(1 << 4, messageStore.getRunningFlags().getFlagBits() & (1 << 4)); + messageStore.shutdown(); + messageStore.destroy(); + + } + @Test public void testIsSpaceFullFunctionFull2Empty() throws Exception { String deleteWhen = "04"; @@ -421,6 +456,10 @@ public class DefaultMessageStoreCleanFilesTest { } private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception { + initMessageStore(genMessageStoreConfig(deleteWhen,diskMaxUsedSpaceRatio), diskSpaceCleanForciblyRatio); + } + + private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxUsedSpaceRatio) { MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest(); messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize); @@ -442,7 +481,10 @@ public class DefaultMessageStoreCleanFilesTest { String storePathCommitLog = storePathRootDir + File.separator + "commitlog"; messageStoreConfig.setStorePathRootDir(storePathRootDir); messageStoreConfig.setStorePathCommitLog(storePathCommitLog); + return messageStoreConfig; + } + private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception { messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig()); diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index 440f9146314a049a91e7096f4b382399b878f8d6..f3e619d7784595280d3a54c239b3136b7ea97199 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -605,6 +605,22 @@ public class DefaultMessageStoreTest { } } + @Test + public void testStorePathOK() { + if (messageStore instanceof DefaultMessageStore) { + assertTrue(fileExists(((DefaultMessageStore) messageStore).getStorePathPhysic())); + assertTrue(fileExists(((DefaultMessageStore) messageStore).getStorePathLogic())); + } + } + + private boolean fileExists(String path) { + if (path != null) { + File f = new File(path); + return f.exists(); + } + return false; + } + private void damageCommitlog(long offset) throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000"); diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java new file mode 100644 index 0000000000000000000000000000000000000000..66b3f93b01f8db4449b427dfdeaa087523254d7c --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.HashSet; +import java.util.Set; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.Test; + + +public class MultiPathMappedFileQueueTest { + + @Test + public void testGetLastMappedFile() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + + @Test + public void testLoadReadOnlyMappedFiles() { + { + //create old mapped files + final byte[] fixedMsg = new byte[1024]; + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + } + mappedFileQueue.shutdown(1000); + } + + // test load and readonly + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/b/"); + config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c"); + MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + + mappedFileQueue.load(); + + assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024); + for (int i = 0; i < 1024; i++) { + assertThat(mappedFileQueue.mappedFiles.get(i).getFile().getName()) + .isEqualTo(UtilAll.offset2FileName(1024 * i)); + } + mappedFileQueue.destroy(); + + } + + @Test + public void testUpdatePathsOnline() { + final byte[] fixedMsg = new byte[1024]; + + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + for (int i = 0; i < 1024; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + int idx = i % storePaths.length; + assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue(); + + if (i == 500) { + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/"); + storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + } + } + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } + + @Test + public void testFullStorePath() { + final byte[] fixedMsg = new byte[1024]; + + Set fullStorePath = new HashSet<>(); + MessageStoreConfig config = new MessageStoreConfig(); + config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER + + "target/unit_test_store/c/"); + MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, () -> fullStorePath); + String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER); + assertThat(storePaths.length).isEqualTo(3); + + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); + assertThat(mappedFile).isNotNull(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue(); + + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length); + assertThat(mappedFile.getFileName().startsWith(storePaths[1])).isTrue(); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 2); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue(); + + fullStorePath.add("target/unit_test_store/b/"); + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 3); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue(); + + mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 4); + assertThat(mappedFile.appendMessage(fixedMsg)).isTrue(); + assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue(); + + mappedFileQueue.shutdown(1000); + mappedFileQueue.destroy(); + } +} \ No newline at end of file diff --git a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8502521fe8e7108240ffa07f1a3c69a66a18dbe6 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.store; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.store.config.FlushDiskType; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.schedule.ScheduleMessageService; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ScheduleMessageServiceTest { + + private Random random = new Random(); + + @Test + public void testCorrectDelayOffset_whenInit() throws Exception { + + ConcurrentMap offsetTable = null; + + ScheduleMessageService scheduleMessageService = new ScheduleMessageService((DefaultMessageStore) buildMessageStore()); + scheduleMessageService.parseDelayLevel(); + + ConcurrentMap offsetTable1 = new ConcurrentHashMap<>(); + for (int i = 1; i <= 18; i++) { + offsetTable1.put(i, random.nextLong()); + } + + Field field = scheduleMessageService.getClass().getDeclaredField("offsetTable"); + field.setAccessible(true); + field.set(scheduleMessageService, offsetTable1); + + String jsonStr = scheduleMessageService.encode(); + scheduleMessageService.decode(jsonStr); + + offsetTable = (ConcurrentMap) field.get(scheduleMessageService); + + for (Map.Entry entry : offsetTable.entrySet()) { + assertEquals(entry.getValue(), offsetTable1.get(entry.getKey())); + } + + scheduleMessageService.correctDelayOffset(); + + offsetTable = (ConcurrentMap) field.get(scheduleMessageService); + + for (long offset : offsetTable.values()) { + assertEquals(offset, 0); + } + + } + + private MessageStore buildMessageStore() throws Exception { + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); + messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10); + messageStoreConfig.setMaxHashSlotNum(10000); + messageStoreConfig.setMaxIndexNum(100 * 100); + messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH); + messageStoreConfig.setFlushIntervalConsumeQueue(1); + return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig()); + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index bcc8023a767f80f80a7909c3f4eb689d81bd63ac..4322e3178e98f6754814f44c204a1d8c9e826e63 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -33,7 +33,6 @@ import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.protocol.body.BrokerStatsData; import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo; @@ -43,16 +42,12 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.KVTable; -import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; -import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData; -import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo; -import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.common.topic.TopicValidator; @@ -507,10 +502,24 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { - return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis); + return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException { + return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis); + } + + @Override + public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic, + long timeoutMillis) throws InterruptedException, RemotingException, + MQBrokerException, MQClientException { + return this.defaultMQAdminExtImpl.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis); } /* (non-Javadoc) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index f38c3c643cfdb56f7d77b6891cba1ae7ec0041e2..2f2e49130a9d89956430b436a27f5f644608ae0e 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -104,6 +104,24 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { private long timeoutMillis = 20000; private Random random = new Random(); + private static final Set SYSTEM_GROUP_SET = new HashSet(); + + static { + SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP); + SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS); + } + public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) { this(defaultMQAdminExt, null, timeoutMillis); } @@ -941,7 +959,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) { BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName()); if (brokerData != null) { - String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + String addr = RemotingUtil.convert2IpString(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID)); if (RemotingUtil.socketAddress2String(msg.getStoreHost()).equals(addr)) { if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) { return true; @@ -1016,12 +1034,49 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr, + long timeoutMillis) throws InterruptedException, + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl() + .getAllSubscriptionGroup(brokerAddr, timeoutMillis); + + Iterator> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable() + .entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry configEntry = iterator.next(); + if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) { + iterator.remove(); + } + } + + return subscriptionGroupWrapper; + } + + @Override + public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis); } + @Override + public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic, + long timeoutMillis) throws InterruptedException, RemotingException, + MQBrokerException, MQClientException { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis); + TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr, + timeoutMillis); + Iterator> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet() + .iterator(); + while (iterator.hasNext()) { + String topic = iterator.next().getKey(); + if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) { + iterator.remove(); + } + } + return topicConfigSerializeWrapper; + } + @Override public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, newTopic, queueNum, 0); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 0da449d0257af0727577e1aecb77af73b00afcbb..5c53ead609255dcfa6bc0fe1c8d1fbd6e9f9a233 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -247,10 +247,18 @@ public interface MQAdminExt extends MQAdmin { long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException; - TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException; + TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + RemotingConnectException, MQBrokerException; + + TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic, + long timeoutMillis) throws InterruptedException, RemotingException, + MQBrokerException, MQClientException; + void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java index 2e65f980848a9ef553dc5ec593ad4ef68cde1b6b..8984ca67b2029350e434d0214c2f5358058a28db 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java @@ -146,5 +146,4 @@ public class CommandUtil { } throw new Exception(ERROR_MESSAGE); } - } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 1d299598a28bb46ba5d15a5e28dccd9203d81aa5..50ca1b8805d2ef8811118046ee52f6f64f3c20ca 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -58,6 +58,8 @@ import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCo import org.apache.rocketmq.tools.command.logicalqueue.QueryTopicLogicalQueueMappingCommand; import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand; import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand; +import org.apache.rocketmq.tools.command.export.ExportMetricsCommand; +import org.apache.rocketmq.tools.command.export.ExportConfigsCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand; import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand; @@ -80,6 +82,7 @@ import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand; import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand; import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand; +import org.apache.rocketmq.tools.command.export.ExportMetadataCommand; import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand; import org.apache.rocketmq.tools.command.topic.TopicListSubCommand; import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand; @@ -160,7 +163,6 @@ public class MQAdminStartup { initCommand(new UpdateTopicSubCommand()); initCommand(new DeleteTopicSubCommand()); initCommand(new UpdateSubGroupSubCommand()); - initCommand(new SetConsumeModeSubCommand()); initCommand(new DeleteSubscriptionGroupCommand()); initCommand(new UpdateBrokerConfigSubCommand()); initCommand(new UpdateTopicPermSubCommand()); @@ -230,6 +232,10 @@ public class MQAdminStartup { initCommand(new QueryTopicLogicalQueueMappingCommand()); initCommand(new MigrateTopicLogicalQueueCommand()); initCommand(new UpdateTopicLogicalQueueNumCommand()); + + initCommand(new ExportMetadataCommand()); + initCommand(new ExportConfigsCommand()); + initCommand(new ExportMetricsCommand()); } private static void initLogback() throws JoranException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..c3ca9d31902c02bfdf19cc3cc1668dc0ec745096 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.alibaba.fastjson.JSON; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class ExportConfigsCommand implements SubCommand { + @Override + public String commandName() { + return "exportConfigs"; + } + + @Override + public String commandDesc() { + return "export configs"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterName", true, "choose a cluster to export"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("f", "filePath", true, + "export configs.json path | default /tmp/rocketmq/export"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) + throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + String clusterName = commandLine.getOptionValue('c').trim(); + String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f') + .trim(); + + defaultMQAdminExt.start(); + Map result = new HashMap<>(); + // name servers + List nameServerAddressList = defaultMQAdminExt.getNameServerAddressList(); + + //broker + int masterBrokerSize = 0; + int slaveBrokerSize = 0; + Map brokerConfigs = new HashMap<>(); + Map> masterAndSlaveMap + = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName); + for (String masterAddr : masterAndSlaveMap.keySet()) { + Properties masterProperties = defaultMQAdminExt.getBrokerConfig(masterAddr); + masterBrokerSize++; + slaveBrokerSize += masterAndSlaveMap.get(masterAddr).size(); + + brokerConfigs.put(masterProperties.getProperty("brokerName"), needBrokerProprties(masterProperties)); + } + + Map clusterScaleMap = new HashMap<>(); + clusterScaleMap.put("namesrvSize", nameServerAddressList.size()); + clusterScaleMap.put("masterBrokerSize", masterBrokerSize); + clusterScaleMap.put("slaveBrokerSize", slaveBrokerSize); + + result.put("brokerConfigs", brokerConfigs); + result.put("clusterScale", clusterScaleMap); + + String path = filePath + "/configs.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path); + System.out.printf("export %s success", path); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private Properties needBrokerProprties(Properties properties) { + Properties newProperties = new Properties(); + newProperties.setProperty("brokerClusterName", properties.getProperty("brokerClusterName")); + newProperties.setProperty("brokerId", properties.getProperty("brokerId")); + newProperties.setProperty("brokerName", properties.getProperty("brokerName")); + newProperties.setProperty("brokerRole", properties.getProperty("brokerRole")); + newProperties.setProperty("fileReservedTime", properties.getProperty("fileReservedTime")); + newProperties.setProperty("filterServerNums", properties.getProperty("filterServerNums")); + newProperties.setProperty("flushDiskType", properties.getProperty("flushDiskType")); + newProperties.setProperty("maxMessageSize", properties.getProperty("maxMessageSize")); + newProperties.setProperty("messageDelayLevel", properties.getProperty("messageDelayLevel")); + newProperties.setProperty("msgTraceTopicName", properties.getProperty("msgTraceTopicName")); + newProperties.setProperty("slaveReadEnable", properties.getProperty("slaveReadEnable")); + newProperties.setProperty("traceOn", properties.getProperty("traceOn")); + newProperties.setProperty("traceTopicEnable", properties.getProperty("traceTopicEnable")); + newProperties.setProperty("useTLS", properties.getProperty("useTLS")); + newProperties.setProperty("autoCreateTopicEnable", properties.getProperty("autoCreateTopicEnable")); + newProperties.setProperty("autoCreateSubscriptionGroup", properties.getProperty("autoCreateSubscriptionGroup")); + return newProperties; + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..1909436467b05655e3a474f1f675e136749b3eab --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetadataCommand.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.alibaba.fastjson.JSON; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class ExportMetadataCommand implements SubCommand { + + private static final String DEFAULT_FILE_PATH = "/tmp/rocketmq/export"; + + @Override + public String commandName() { + return "exportMetadata"; + } + + @Override + public String commandDesc() { + return "export metadata"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterName", true, "choose a cluster to export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "brokerAddr", true, "choose a broker to export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "filePath", true, "export metadata.json path | default /tmp/rocketmq/export"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", false, "only export topic metadata"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("g", "subscriptionGroup", false, "only export subscriptionGroup metadata"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "specialTopic", false, "need retryTopic and dlqTopic"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) + throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + defaultMQAdminExt.start(); + + String filePath = !commandLine.hasOption('f') ? DEFAULT_FILE_PATH : commandLine.getOptionValue('f') + .trim(); + + boolean specialTopic = commandLine.hasOption('s'); + + if (commandLine.hasOption('b')) { + final String brokerAddr = commandLine.getOptionValue('b').trim(); + + if (commandLine.hasOption('t')) { + filePath = filePath + "/topic.json"; + TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig( + brokerAddr, specialTopic, 10000L); + MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath); + System.out.printf("export %s success", filePath); + } else if (commandLine.hasOption('g')) { + filePath = filePath + "/subscriptionGroup.json"; + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup( + brokerAddr, 10000L); + MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath); + System.out.printf("export %s success", filePath); + } + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + Set masterSet = + CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + + Map topicConfigMap = new HashMap<>(); + Map subGroupConfigMap = new HashMap<>(); + + for (String addr : masterSet) { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig( + addr, specialTopic, 10000L); + + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup( + addr, 10000); + + if (commandLine.hasOption('t')) { + filePath = filePath + "/topic.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath); + System.out.printf("export %s success", filePath); + return; + } else if (commandLine.hasOption('g')) { + filePath = filePath + "/subscriptionGroup.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath); + System.out.printf("export %s success", filePath); + return; + } else { + for (Map.Entry entry : topicConfigSerializeWrapper.getTopicConfigTable().entrySet()) { + TopicConfig topicConfig = topicConfigMap.get(entry.getKey()); + if (null != topicConfig) { + entry.getValue().setWriteQueueNums( + topicConfig.getWriteQueueNums() + entry.getValue().getWriteQueueNums()); + entry.getValue().setReadQueueNums( + topicConfig.getReadQueueNums() + entry.getValue().getReadQueueNums()); + } + topicConfigMap.put(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet()) { + + SubscriptionGroupConfig subscriptionGroupConfig = subGroupConfigMap.get(entry.getKey()); + if (null != subscriptionGroupConfig) { + entry.getValue().setRetryQueueNums( + subscriptionGroupConfig.getRetryQueueNums() + entry.getValue().getRetryQueueNums()); + } + subGroupConfigMap.put(entry.getKey(), entry.getValue()); + } + + Map result = new HashMap<>(); + result.put("topicConfigTable", topicConfigMap); + result.put("subscriptionGroupTable", subGroupConfigMap); + result.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); + result.put("exportTime", System.currentTimeMillis()); + + filePath = filePath + "/metadata.json"; + MixAll.string2FileNotSafe(JSON.toJSONString(result, true), filePath); + System.out.printf("export %s success", filePath); + } + + } + } else { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} + diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java new file mode 100644 index 0000000000000000000000000000000000000000..497083f09a7333af364ae3166a54fae738a96a20 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportMetricsCommand.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.export; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import com.alibaba.fastjson.JSON; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.Connection; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand; + +public class ExportMetricsCommand implements SubCommand { + + @Override + public String commandName() { + return "exportMetrics"; + } + + @Override + public String commandDesc() { + return "export metrics"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterName", true, "choose a cluster to export"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("f", "filePath", true, + "export metrics.json path | default /tmp/rocketmq/export"); + opt.setRequired(false); + options.addOption(opt); + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) + throws SubCommandException { + + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + String clusterName = commandLine.getOptionValue('c').trim(); + String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f') + .trim(); + + defaultMQAdminExt.start(); + + Map>> evaluateReportMap = new HashMap<>(); + Map totalTpsMap = new HashMap<>(); + Map totalOneDayNumMap = new HashMap<>(); + initTotalMap(totalTpsMap, totalOneDayNumMap); + + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + Set brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName); + for (String brokerName : brokerNameSet) { + BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + if (brokerData != null) { + String addr = brokerData.getBrokerAddrs().get(0L); + + KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(addr); + + Properties properties = defaultMQAdminExt.getBrokerConfig(addr); + + SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(addr, + 10000); + + Map> brokerInfo = new HashMap<>(); + + //broker environment,machine configuration + brokerInfo.put("runtimeEnv", getRuntimeEnv(kvTable, properties)); + + brokerInfo.put("runtimeQuota", + getRuntimeQuota(kvTable, defaultMQAdminExt, addr, totalTpsMap, + totalOneDayNumMap, subscriptionGroupWrapper)); + + // runtime version + brokerInfo.put("runtimeVersion", + getRuntimeVersion(defaultMQAdminExt, subscriptionGroupWrapper)); + + evaluateReportMap.put(brokerName, brokerInfo); + } + + } + + String path = filePath + "/metrics.json"; + + Map totalData = new HashMap<>(); + totalData.put("totalTps", totalTpsMap); + totalData.put("totalOneDayNum", totalOneDayNumMap); + + Map result = new HashMap<>(); + result.put("evaluateReport", evaluateReportMap); + result.put("totalData", totalData); + + MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path); + System.out.printf("export %s success", path); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + + } + + private Map getRuntimeVersion(DefaultMQAdminExt defaultMQAdminExt, + SubscriptionGroupWrapper subscriptionGroupWrapper) { + Map runtimeVersionMap = new HashMap(); + + Set clientInfoSet = new HashSet<>(); + for (Map.Entry entry : subscriptionGroupWrapper + .getSubscriptionGroupTable().entrySet()) { + try { + ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo( + entry.getValue().getGroupName()); + for (Connection conn : cc.getConnectionSet()) { + String clientInfo = conn.getLanguage() + "%" + MQVersion.getVersionDesc(conn.getVersion()); + clientInfoSet.add(clientInfo); + } + } catch (Exception e) { + continue; + } + } + runtimeVersionMap.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION)); + runtimeVersionMap.put("clientInfo", clientInfoSet); + return runtimeVersionMap; + } + + private Map getRuntimeEnv(KVTable kvTable, Properties properties) { + Map runtimeEnvMap = new HashMap<>(); + runtimeEnvMap.put("cpuNum", properties.getProperty("clientCallbackExecutorThreads")); + runtimeEnvMap.put("totalMemKBytes", kvTable.getTable().get("totalMemKBytes")); + return runtimeEnvMap; + } + + private Map getRuntimeQuota(KVTable kvTable, DefaultMQAdminExt defaultMQAdminExt, String brokerAddr, + Map totalTpsMap, Map totalOneDayNumMap, + SubscriptionGroupWrapper subscriptionGroupWrapper) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig( + brokerAddr, false, 10000); + + BrokerStatsData transStatsData = null; + + try { + transStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr, + BrokerStatsManager.TOPIC_PUT_NUMS, + TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC); + } catch (MQClientException e) { + } + + BrokerStatsData scheduleStatsData = null; + try { + scheduleStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr, + BrokerStatsManager.TOPIC_PUT_NUMS, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC); + } catch (MQClientException e) { + } + + Map runtimeQuotaMap = new HashMap<>(); + //disk use ratio + Map diskRatioMap = new HashMap<>(); + diskRatioMap.put("commitLogDiskRatio", kvTable.getTable().get("commitLogDiskRatio")); + diskRatioMap.put("consumeQueueDiskRatio", kvTable.getTable().get("consumeQueueDiskRatio")); + runtimeQuotaMap.put("diskRatio", diskRatioMap); + + //inTps and outTps + Map tpsMap = new HashMap<>(); + double normalInTps = 0; + double normalOutTps = 0; + String putTps = kvTable.getTable().get("putTps"); + String getTransferedTps = kvTable.getTable().get("getTransferedTps"); + String[] inTpss = putTps.split(" "); + if (inTpss.length > 0) { + normalInTps = Double.parseDouble(inTpss[0]); + } + + String[] outTpss = getTransferedTps.split(" "); + if (outTpss.length > 0) { + normalOutTps = Double.parseDouble(outTpss[0]); + } + + double transInTps = null != transStatsData ? transStatsData.getStatsMinute().getTps() : 0.0; + double scheduleInTps = null != scheduleStatsData ? scheduleStatsData.getStatsMinute().getTps() : 0.0; + + long transOneDayInNum = null != transStatsData ? StatsAllSubCommand.compute24HourSum(transStatsData) : 0; + long scheduleOneDayInNum = null != scheduleStatsData ? StatsAllSubCommand.compute24HourSum(scheduleStatsData) : 0; + + //current minute tps + tpsMap.put("normalInTps", normalInTps); + tpsMap.put("normalOutTps", normalOutTps); + tpsMap.put("transInTps", transInTps); + tpsMap.put("scheduleInTps", scheduleInTps); + runtimeQuotaMap.put("tps", tpsMap); + + //one day num + Map oneDayNumMap = new HashMap<>(); + long normalOneDayInNum = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning")) - + Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning")); + long normalOneDayOutNum = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning")) - + Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning")); + oneDayNumMap.put("normalOneDayInNum", normalOneDayInNum); + oneDayNumMap.put("normalOneDayOutNum", normalOneDayOutNum); + oneDayNumMap.put("transOneDayInNum", transOneDayInNum); + oneDayNumMap.put("scheduleOneDayInNum", scheduleOneDayInNum); + runtimeQuotaMap.put("oneDayNum", oneDayNumMap); + + //all broker current minute tps + totalTpsMap.put("totalNormalInTps", totalTpsMap.get("totalNormalInTps") + normalInTps); + totalTpsMap.put("totalNormalOutTps", totalTpsMap.get("totalNormalOutTps") + normalOutTps); + totalTpsMap.put("totalTransInTps", totalTpsMap.get("totalTransInTps") + transInTps); + totalTpsMap.put("totalScheduleInTps", totalTpsMap.get("totalScheduleInTps") + scheduleInTps); + + + //all broker one day num + totalOneDayNumMap.put("normalOneDayInNum", totalOneDayNumMap.get("normalOneDayInNum") + normalOneDayInNum); + totalOneDayNumMap.put("normalOneDayOutNum", totalOneDayNumMap.get("normalOneDayOutNum") + normalOneDayOutNum); + totalOneDayNumMap.put("transOneDayInNum", totalOneDayNumMap.get("transOneDayInNum") + transOneDayInNum); + totalOneDayNumMap.put("scheduleOneDayInNum", totalOneDayNumMap.get("scheduleOneDayInNum") + scheduleOneDayInNum); + + // putMessageAverageSize + runtimeQuotaMap.put("messageAverageSize", kvTable.getTable().get("putMessageAverageSize")); + + //topicSize + runtimeQuotaMap.put("topicSize", topicConfigSerializeWrapper.getTopicConfigTable().size()); + runtimeQuotaMap.put("groupSize", subscriptionGroupWrapper.getSubscriptionGroupTable().size()); + return runtimeQuotaMap; + } + + private void initTotalMap(Map totalTpsMap, Map totalOneDayNumMap) { + totalTpsMap.put("totalNormalInTps", 0.0); + totalTpsMap.put("totalNormalOutTps", 0.0); + totalTpsMap.put("totalTransInTps", 0.0); + totalTpsMap.put("totalScheduleInTps", 0.0); + + totalOneDayNumMap.put("normalOneDayInNum", 0L); + totalOneDayNumMap.put("normalOneDayOutNum", 0L); + totalOneDayNumMap.put("transOneDayInNum", 0L); + totalOneDayNumMap.put("scheduleOneDayInNum", 0L); + } +} diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java index 5fa0a1013f6e3f5155a3cb0d9266e1d6e4766b82..f1f237ddda506aabf0f11bad2f5e01e2a1cbcf0b 100644 --- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java +++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.OffsetWrapper; import org.apache.rocketmq.common.admin.TopicOffset; @@ -55,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -249,6 +251,14 @@ public class DefaultMQAdminExtTest { consumerRunningInfo.setStatusTable(new TreeMap()); consumerRunningInfo.setSubscriptionSet(new TreeSet()); when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo); + + TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); + topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap() { + { + put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig")); + } + }); + when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper); } @AfterClass @@ -464,4 +474,10 @@ public class DefaultMQAdminExtTest { assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0), System.currentTimeMillis())).isEqualTo(1011L); } + + @Test + public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException { + TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig"); + assertThat(topicConfig.getTopicName().equals("topic_test_examine_topicConfig")); + } } \ No newline at end of file