提交 f1b0318d 编写于 作者: R RongtongJin

Merge remote-tracking branch 'apache/develop' into 5.0.0-alpha

# Conflicts:
#	.travis.yml
#	acl/pom.xml
#	broker/pom.xml
#	client/pom.xml
#	common/pom.xml
#	common/src/main/java/org/apache/rocketmq/common/MQVersion.java
#	distribution/pom.xml
#	example/pom.xml
#	filter/pom.xml
#	logappender/pom.xml
#	logging/pom.xml
#	namesrv/pom.xml
#	openmessaging/pom.xml
#	pom.xml
#	remoting/pom.xml
#	srvutil/pom.xml
#	store/pom.xml
#	store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
#	test/pom.xml
#	tools/pom.xml
#	tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
#	tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
#	tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
#	tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
#	tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
......@@ -16,6 +16,8 @@ matrix:
# On Linux we install latest OpenJDK 1.8 from Ubuntu repositories
- name: Linux x86_64
arch: amd64
# - name: Linux aarch64
# arch: arm64
cache:
directories:
......@@ -41,6 +43,7 @@ before_script:
- ulimit -c unlimited
script:
- mvn verify
- travis_retry mvn -B clean apache-rat:check
- travis_retry mvn -B package jacoco:report coveralls:report
......
......@@ -95,8 +95,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
return null;
}
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
SendMessageContext mqtraceContext;
mqtraceContext = new SendMessageContext();
SendMessageContext mqtraceContext = new SendMessageContext();
mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
mqtraceContext.setNamespace(namespace);
mqtraceContext.setTopic(requestHeader.getTopic());
......
......@@ -56,6 +56,7 @@ import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
......@@ -186,7 +187,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
......@@ -237,6 +238,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (correctTopic != null) {
backTopic = correctTopic;
}
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), r.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
this.brokerController.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), r.getAppendMessageResult().getWroteBytes());
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
......@@ -653,8 +660,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
double physicRatio = 100;
String storePath = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String storePathPhysic : paths) {
physicRatio = Math.min(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
}
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
......
......@@ -158,6 +158,7 @@ public class ClientConfig {
this.useTLS = cc.useTLS;
this.namespace = cc.namespace;
this.language = cc.language;
this.mqClientApiTimeout = cc.mqClientApiTimeout;
}
public ClientConfig cloneClientConfig() {
......@@ -176,6 +177,7 @@ public class ClientConfig {
cc.useTLS = useTLS;
cc.namespace = namespace;
cc.language = language;
cc.mqClientApiTimeout = mqClientApiTimeout;
return cc;
}
......
......@@ -47,7 +47,7 @@ public class MessageSelector {
}
/**
* Use SLQ92 to select message.
* Use SQL92 to select message.
*
* @param sql if null or empty, will be treated as select all message.
*/
......
......@@ -675,20 +675,13 @@ public class MQClientAPIImpl {
retryBrokerName = mqChosen.getBrokerName();
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
retryBrokerName);
log.warn(String.format("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
retryBrokerName), e);
try {
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingConnectException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
} catch (InterruptedException | RemotingTooMuchRequestException e1) {
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
context, false, producer);
} catch (RemotingException e1) {
......
......@@ -983,11 +983,7 @@ public class MQClientInstance {
try {
this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());
log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
} catch (RemotingException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (InterruptedException e) {
log.error("unregister client exception from broker: " + addr, e);
} catch (MQBrokerException e) {
} catch (RemotingException | InterruptedException | MQBrokerException e) {
log.error("unregister client exception from broker: " + addr, e);
}
}
......
......@@ -634,14 +634,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
} catch (RemotingException | MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
......@@ -936,19 +929,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
} catch (RemotingException | MQBrokerException | InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
......
......@@ -640,9 +640,6 @@ public class MQVersion {
V4_9_9_SNAPSHOT,
V4_9_9,
V5_0_0_PREVIEW_SNAPSHOT,
V5_0_0_PREVIEW,
V5_0_0_SNAPSHOT,
V5_0_0,
......
......@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
......@@ -52,12 +51,11 @@ public class UtilAll {
public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName(); // format: "pid@hostname"
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@')));
} catch (Exception e) {
return -1;
}
......@@ -198,6 +196,11 @@ public class UtilAll {
cal.get(Calendar.SECOND));
}
public static boolean isPathExists(final String path) {
File file = new File(path);
return file.exists();
}
public static double getDiskPartitionSpaceUsedPercent(final String path) {
if (null == path || path.isEmpty()) {
log.error("Error when measuring disk space usage, path is null or empty, path : {}", path);
......
......@@ -28,4 +28,16 @@ public class RemotingUtilTest {
assertThat(localAddress).isNotNull();
assertThat(localAddress.length()).isGreaterThan(0);
}
@Test
public void testConvert2IpStringWithIp() {
String result = RemotingUtil.convert2IpString("127.0.0.1:9876");
assertThat(result).isEqualTo("127.0.0.1:9876");
}
@Test
public void testConvert2IpStringWithHost() {
String result = RemotingUtil.convert2IpString("localhost:9876");
assertThat(result).isEqualTo("127.0.0.1:9876");
}
}
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ -z "$ROCKETMQ_HOME" ]; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ]; do
ls=$(ls -ld "$PRG")
link=$(expr "$ls" : '.*-> \(.*\)$')
if expr "$link" : '/.*' >/dev/null; then
PRG="$link"
else
PRG="$(dirname "$PRG")/$link"
fi
done
saveddir=$(pwd)
ROCKETMQ_HOME=$(dirname "$PRG")/..
# make it fully qualified
ROCKETMQ_HOME=$(cd "$ROCKETMQ_HOME" && pwd)
cd "$saveddir"
fi
export ROCKETMQ_HOME
namesrvAddr=
while [ -z "${namesrvAddr}" ]; do
read -p "Enter name server address list:" namesrvAddr
done
clusterName=
while [ -z "${clusterName}" ]; do
read -p "Choose a cluster to export:" clusterName
done
read -p "Enter file path to export [default /tmp/rocketmq/export]:" filePath
if [ -z "${filePath}" ]; then
filePath="/tmp/rocketmq/config"
fi
if [[ -e ${filePath} ]]; then
rm -rf ${filePath}
fi
sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetrics -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
sh ${ROCKETMQ_HOME}/bin/mqadmin exportConfigs -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
sh ${ROCKETMQ_HOME}/bin/mqadmin exportMetadata -c ${clusterName} -n ${namesrvAddr} -f ${filePath}
cd ${filePath} || exit
configs=$(cat ./configs.json)
if [ -z "$configs" ]; then
configs="{}"
fi
metadata=$(cat ./metadata.json)
if [ -z "$metadata" ]; then
metadata="{}"
fi
metrics=$(cat ./metrics.json)
if [ -z "$metrics" ]; then
metrics="{}"
fi
echo "{
\"configs\": ${configs},
\"metadata\": ${metadata},
\"metrics\": ${metrics}
}" >rocketmq-metadata-export.json
echo -e "[INFO] The RocketMQ metadata has been exported to the file:${filePath}/rocketmq-metadata-export.json"
......@@ -572,7 +572,7 @@ public class ListSplitter implements Iterator<List<Message>> {
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
......
......@@ -57,7 +57,7 @@ public class ListSplitter implements Iterator<List<Message>> {
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
......@@ -78,4 +78,4 @@ while (splitter.hasNext()) {
// handle the error
}
}
```
\ No newline at end of file
```
......@@ -36,7 +36,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
Set<String> topics = new HashSet<>();
......@@ -46,99 +46,101 @@ public class PullConsumer {
consumer.start();
ExecutorService executors = Executors.newFixedThreadPool(topics.size(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PullConsumerThread");
}
});
for(String topic : consumer.getRegisterTopics()){
executors.execute(new Runnable() {
public void doSomething(List<MessageExt> msgs){
//do you business
System.out.println(msgs);
}
@Override
public void run() {
while(true){
try {
Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance(topic);
if(messageQueues == null || messageQueues.isEmpty()){
Thread.sleep(1000);
continue;
}
PullResult pullResult = null;
for(MessageQueue messageQueue : messageQueues){
try {
long offset = this.consumeFromOffset(messageQueue);
pullResult = consumer.pull(messageQueue, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
if(msgs != null && !msgs.isEmpty()){
this.doSomething(msgs);
//update offset to broker
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
//print pull tps
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
}
break;
case OFFSET_ILLEGAL:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_NEW_MSG:
Thread.sleep(1);
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_MATCHED_MSG:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
default:
}
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
}
} catch (MQClientException e) {
//reblance error
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
}
}
public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException{
//-1 when started
long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
if(offset < 0){
//query from broker
offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
}
if (offset < 0){
//first time start from last offset
offset = consumer.maxOffset(messageQueue);
});
for (String topic : consumer.getRegisterTopics()) {
executors.execute(new Runnable() {
public void doSomething(List<MessageExt> msgs) {
//do you business
}
@Override
public void run() {
while (true) {
try {
Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance(topic);
if (messageQueues == null || messageQueues.isEmpty()) {
Thread.sleep(1000);
continue;
}
PullResult pullResult = null;
for (MessageQueue messageQueue : messageQueues) {
try {
long offset = this.consumeFromOffset(messageQueue);
pullResult = consumer.pull(messageQueue, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
if (msgs != null && !msgs.isEmpty()) {
this.doSomething(msgs);
//update offset to broker
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
//print pull tps
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
}
break;
case OFFSET_ILLEGAL:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_NEW_MSG:
Thread.sleep(1);
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_MATCHED_MSG:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
default:
}
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (MQClientException e) {
//reblance error
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public long consumeFromOffset(MessageQueue messageQueue) throws MQClientException {
//-1 when started
long offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
if (offset < 0) {
//query from broker
offset = consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
}
if (offset < 0) {
//first time start from last offset
offset = consumer.maxOffset(messageQueue);
}
//make sure
if (offset < 0){
offset = 0;
if (offset < 0) {
offset = 0;
}
return offset;
}
public void incPullTPS(String topic, int pullSize) {
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
}
});
return offset;
}
public void incPullTPS(String topic, int pullSize) {
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, pullSize);
}
});
}
// executors.shutdown();
// consumer.shutdown();
......
......@@ -165,6 +165,10 @@ public class RemotingUtil {
return sb.toString();
}
public static String convert2IpString(final String addr) {
return socketAddress2String(string2SocketAddress(addr));
}
private static boolean isBridge(NetworkInterface networkInterface) {
try {
if (isLinuxPlatform()) {
......
......@@ -22,10 +22,12 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
......@@ -43,6 +45,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
......@@ -71,9 +74,20 @@ public class CommitLog {
protected final PutMessageLock putMessageLock;
private volatile Set<String> fullStorePaths = Collections.emptySet();
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
......@@ -95,6 +109,14 @@ public class CommitLog {
}
public void setFullStorePaths(Set<String> fullStorePaths) {
this.fullStorePaths = fullStorePaths;
}
public Set<String> getFullStorePaths() {
return fullStorePaths;
}
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
......
......@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
......@@ -167,6 +168,8 @@ public class DefaultMessageStore implements MessageStore {
File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
MappedFile.ensureDirOK(file.getParent());
MappedFile.ensureDirOK(getStorePathPhysic());
MappedFile.ensureDirOK(getStorePathLogic());
lockFile = new RandomAccessFile(file, "rw");
}
......@@ -190,10 +193,6 @@ public class DefaultMessageStore implements MessageStore {
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log
result = result && this.commitLog.load();
......@@ -209,7 +208,12 @@ public class DefaultMessageStore implements MessageStore {
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
if (null != scheduleMessageService) {
result = this.scheduleMessageService.load();
}
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
......@@ -539,7 +543,8 @@ public class DefaultMessageStore implements MessageStore {
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
// lazy init when find msg.
GetMessageResult getResult = null;
final long maxOffsetPy = this.commitLog.getMaxOffset();
......@@ -576,6 +581,9 @@ public class DefaultMessageStore implements MessageStore {
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
getResult = new GetMessageResult(maxMsgNums);
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
......@@ -679,6 +687,11 @@ public class DefaultMessageStore implements MessageStore {
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
// lazy init no data found.
if (getResult == null) {
getResult = new GetMessageResult(0);
}
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
......@@ -782,8 +795,8 @@ public class DefaultMessageStore implements MessageStore {
return this.storeStatsService.toString();
}
private String getStorePathPhysic() {
String storePathPhysic = "";
public String getStorePathPhysic() {
String storePathPhysic;
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog()) {
storePathPhysic = ((DLedgerCommitLog)DefaultMessageStore.this.getCommitLog()).getdLedgerServer().getdLedgerConfig().getDataStorePath();
} else {
......@@ -792,20 +805,29 @@ public class DefaultMessageStore implements MessageStore {
return storePathPhysic;
}
public String getStorePathLogic() {
return StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
}
@Override
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
double minPhysicsUsedRatio = Double.MAX_VALUE;
String commitLogStorePath = getStorePathPhysic();
String[] paths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String clPath : paths) {
double physicRatio = UtilAll.isPathExists(clPath) ?
UtilAll.getDiskPartitionSpaceUsedPercent(clPath) : -1;
result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
minPhysicsUsedRatio = Math.min(minPhysicsUsedRatio, physicRatio);
}
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(minPhysicsUsedRatio));
}
{
String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathLogic());
result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
}
......@@ -1681,25 +1703,43 @@ public class DefaultMessageStore implements MessageStore {
cleanImmediately = false;
{
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
if (physicRatio > diskSpaceWarningLevelRatio) {
String commitLogStorePath = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
Set<String> fullStorePath = new HashSet<>();
double minPhysicRatio = 100;
String minStorePath = null;
for (String storePathPhysic : storePaths) {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
if (minPhysicRatio > physicRatio) {
minPhysicRatio = physicRatio;
minStorePath = storePathPhysic;
}
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(storePathPhysic);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
if (minPhysicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio +
", so mark disk full, storePathPhysic=" + minStorePath);
}
cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio +
", so mark disk ok, storePathPhysic=" + minStorePath);
}
}
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
if (minPhysicRatio < 0 || minPhysicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
+ minPhysicRatio + ", storePathPhysic=" + minStorePath);
return true;
}
}
......@@ -1740,8 +1780,27 @@ public class DefaultMessageStore implements MessageStore {
public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
}
public double calcStorePathPhysicRatio() {
Set<String> fullStorePath = new HashSet<>();
String storePath = getStorePathPhysic();
String[] paths = storePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
double minPhysicRatio = 100;
for (String path : paths) {
double physicRatio = UtilAll.isPathExists(path) ?
UtilAll.getDiskPartitionSpaceUsedPercent(path) : -1;
minPhysicRatio = Math.min(minPhysicRatio, physicRatio);
if (physicRatio > diskSpaceCleanForciblyRatio) {
fullStorePath.add(path);
}
}
DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);
return minPhysicRatio;
}
public boolean isSpaceFull() {
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
double physicRatio = calcStorePathPhysicRatio();
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
if (physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
......
......@@ -23,11 +23,9 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class GetMessageResult {
private final List<SelectMappedBufferResult> messageMapedList =
new ArrayList<SelectMappedBufferResult>(100);
private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
private final List<Long> messageQueueOffset = new ArrayList<>(100);
private final List<SelectMappedBufferResult> messageMapedList;
private final List<ByteBuffer> messageBufferList;
private final List<Long> messageQueueOffset;
private GetMessageStatus status;
private long nextBeginOffset;
......@@ -41,6 +39,15 @@ public class GetMessageResult {
private int msgCount4Commercial = 0;
public GetMessageResult() {
messageMapedList = new ArrayList<>(100);
messageBufferList = new ArrayList<>(100);
messageQueueOffset = new ArrayList<>(100);
}
public GetMessageResult(int resultSize) {
messageMapedList = new ArrayList<>(resultSize);
messageBufferList = new ArrayList<>(resultSize);
messageQueueOffset = new ArrayList<>(resultSize);
}
public GetMessageStatus getStatus() {
......
......@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
......@@ -37,13 +38,13 @@ public class MappedFileQueue {
private final String storePath;
private final int mappedFileSize;
protected final int mappedFileSize;
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private final AllocateMappedFileService allocateMappedFileService;
private long flushedWhere = 0;
protected long flushedWhere = 0;
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
......@@ -144,35 +145,40 @@ public class MappedFileQueue {
}
}
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
}
File[] ls = dir.listFiles();
if (ls != null) {
return doLoad(Arrays.asList(ls));
}
return true;
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
public boolean doLoad(List<File> files) {
// ascending order
files.sort(Comparator.comparing(File::getName));
for (File file : files) {
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
return true;
}
......@@ -204,33 +210,41 @@ public class MappedFileQueue {
}
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
return tryCreateMappedFile(createOffset);
}
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
return mappedFileLast;
}
protected MappedFile tryCreateMappedFile(long createOffset) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
return mappedFile;
this.mappedFiles.add(mappedFile);
}
return mappedFileLast;
return mappedFile;
}
public MappedFile getLastMappedFile(final long startOffset) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class MultiPathMappedFileQueue extends MappedFileQueue {
private final MessageStoreConfig config;
private final Supplier<Set<String>> fullStorePathsSupplier;
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService,
Supplier<Set<String>> fullStorePathsSupplier) {
super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
this.config = messageStoreConfig;
this.fullStorePathsSupplier = fullStorePathsSupplier;
}
private Set<String> getPaths() {
String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
return new HashSet<>(Arrays.asList(paths));
}
private Set<String> getReadonlyPaths() {
String pathStr = config.getReadOnlyCommitLogStorePaths();
if (StringUtils.isBlank(pathStr)) {
return Collections.emptySet();
}
String[] paths = pathStr.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
return new HashSet<>(Arrays.asList(paths));
}
@Override
public boolean load() {
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());
List<File> files = new ArrayList<>();
for (String path : storePathSet) {
File dir = new File(path);
File[] ls = dir.listFiles();
if (ls != null) {
Collections.addAll(files, ls);
}
}
return doLoad(files);
}
@Override
protected MappedFile tryCreateMappedFile(long createOffset) {
long fileIdx = createOffset / this.mappedFileSize;
Set<String> storePath = getPaths();
Set<String> readonlyPathSet = getReadonlyPaths();
Set<String> fullStorePaths =
fullStorePathsSupplier == null ? Collections.emptySet() : fullStorePathsSupplier.get();
HashSet<String> availableStorePath = new HashSet<>(storePath);
//do not create file in readonly store path.
availableStorePath.removeAll(readonlyPathSet);
//do not create file is space is nearly full.
availableStorePath.removeAll(fullStorePaths);
//if no store path left, fall back to writable store path.
if (availableStorePath.isEmpty()) {
availableStorePath = new HashSet<>(storePath);
availableStorePath.removeAll(readonlyPathSet);
}
String[] paths = availableStorePath.toArray(new String[]{});
Arrays.sort(paths);
String nextFilePath = paths[(int) (fileIdx % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset);
String nextNextFilePath = paths[(int) ((fileIdx + 1) % paths.length)] + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
return doCreateMappedFile(nextFilePath, nextNextFilePath);
}
@Override
public void destroy() {
for (MappedFile mf : this.mappedFiles) {
mf.destroy(1000 * 3);
}
this.mappedFiles.clear();
this.flushedWhere = 0;
Set<String> storePathSet = getPaths();
storePathSet.addAll(getReadonlyPaths());
for (String path : storePathSet) {
File file = new File(path);
if (file.isDirectory()) {
file.delete();
}
}
}
}
......@@ -17,10 +17,14 @@
package org.apache.rocketmq.store.config;
import java.io.File;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
public class MessageStoreConfig {
public static final String MULTI_PATH_SPLITTER = System.getProperty("rocketmq.broker.multiPathSplitter", ",");
//The root directory in which the log data is kept
@ImportantField
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
......@@ -30,6 +34,8 @@ public class MessageStoreConfig {
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
private String readOnlyCommitLogStorePaths = null;
// CommitLog file size,default is 1G
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
......@@ -676,6 +682,13 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
public String getReadOnlyCommitLogStorePaths() {
return readOnlyCommitLogStorePaths;
}
public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) {
this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths;
}
public String getdLegerGroup() {
return dLegerGroup;
}
......
......@@ -81,8 +81,7 @@ public class ScheduleMessageService extends ConfigManager {
}
/**
* @param writeMessageStore
* the writeMessageStore to set
* @param writeMessageStore the writeMessageStore to set
*/
public void setWriteMessageStore(MessageStore writeMessageStore) {
this.writeMessageStore = writeMessageStore;
......@@ -136,7 +135,9 @@ public class ScheduleMessageService extends ConfigManager {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
......@@ -168,9 +169,46 @@ public class ScheduleMessageService extends ConfigManager {
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
result = result && this.correctDelayOffset();
return result;
}
public boolean correctDelayOffset() {
try {
for (int delayLevel : delayLevelTable.keySet()) {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
Long currentDelayOffset = offsetTable.get(delayLevel);
if (currentDelayOffset == null || cq == null) {
continue;
}
long correctDelayOffset = currentDelayOffset;
long cqMinOffset = cq.getMinOffsetInQueue();
long cqMaxOffset = cq.getMaxOffsetInQueue();
if (currentDelayOffset < cqMinOffset) {
correctDelayOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
if (currentDelayOffset > cqMaxOffset) {
correctDelayOffset = cqMaxOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
if (correctDelayOffset != currentDelayOffset) {
log.error("correct delay offset [ delayLevel {} ] from {} to {}", delayLevel, currentDelayOffset, correctDelayOffset);
offsetTable.put(delayLevel, correctDelayOffset);
}
}
} catch (Exception e) {
log.error("correctDelayOffset exception", e);
return false;
}
return true;
}
@Override
public String configFilePath() {
return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
......@@ -323,7 +361,7 @@ public class ScheduleMessageService extends ConfigManager {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
msgInner.getTopic(), msgInner);
continue;
}
PutMessageResult putMessageResult =
......@@ -358,14 +396,9 @@ public class ScheduleMessageService extends ConfigManager {
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
}
}
} else {
......@@ -390,10 +423,17 @@ public class ScheduleMessageService extends ConfigManager {
else {
long cqMinOffset = cq.getMinOffsetInQueue();
long cqMaxOffset = cq.getMaxOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
if (offset > cqMaxOffset) {
failScheduleOffset = cqMaxOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
}
} // end of if (cq != null)
......
......@@ -94,6 +94,41 @@ public class DefaultMessageStoreCleanFilesTest {
}
@Test
public void testIsSpaceFullMultiCommitLogStorePath() throws Exception {
String deleteWhen = "04";
// the min value of diskMaxUsedSpaceRatio.
int diskMaxUsedSpaceRatio = 1;
// used to set disk-full flag
double diskSpaceCleanForciblyRatio = 0.01D;
MessageStoreConfig config = genMessageStoreConfig(deleteWhen, diskMaxUsedSpaceRatio);
String storePath = config.getStorePathCommitLog();
StringBuilder storePathBuilder = new StringBuilder();
for (int i = 0; i < 3; i++) {
storePathBuilder.append(storePath).append(i).append(MessageStoreConfig.MULTI_PATH_SPLITTER);
}
config.setStorePathCommitLog(storePathBuilder.toString());
String[] paths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
assertEquals(3, paths.length);
initMessageStore(config, diskSpaceCleanForciblyRatio);
// build and put 55 messages, exactly one message per CommitLog file.
buildAndPutMessagesToMessageStore(msgCount);
MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog();
assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size());
int fileCountConsumeQueue = getFileCountConsumeQueue();
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
cleanCommitLogService.isSpaceFull();
assertEquals(1 << 4, messageStore.getRunningFlags().getFlagBits() & (1 << 4));
messageStore.shutdown();
messageStore.destroy();
}
@Test
public void testIsSpaceFullFunctionFull2Empty() throws Exception {
String deleteWhen = "04";
......@@ -421,6 +456,10 @@ public class DefaultMessageStoreCleanFilesTest {
}
private void initMessageStore(String deleteWhen, int diskMaxUsedSpaceRatio, double diskSpaceCleanForciblyRatio) throws Exception {
initMessageStore(genMessageStoreConfig(deleteWhen,diskMaxUsedSpaceRatio), diskSpaceCleanForciblyRatio);
}
private MessageStoreConfig genMessageStoreConfig(String deleteWhen, int diskMaxUsedSpaceRatio) {
MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest();
messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize);
......@@ -442,7 +481,10 @@ public class DefaultMessageStoreCleanFilesTest {
String storePathCommitLog = storePathRootDir + File.separator + "commitlog";
messageStoreConfig.setStorePathRootDir(storePathRootDir);
messageStoreConfig.setStorePathCommitLog(storePathCommitLog);
return messageStoreConfig;
}
private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception {
messageStore = new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig());
......
......@@ -605,6 +605,22 @@ public class DefaultMessageStoreTest {
}
}
@Test
public void testStorePathOK() {
if (messageStore instanceof DefaultMessageStore) {
assertTrue(fileExists(((DefaultMessageStore) messageStore).getStorePathPhysic()));
assertTrue(fileExists(((DefaultMessageStore) messageStore).getStorePathLogic()));
}
}
private boolean fileExists(String path) {
if (path != null) {
File f = new File(path);
return f.exists();
}
return false;
}
private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
public class MultiPathMappedFileQueueTest {
@Test
public void testGetLastMappedFile() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testLoadReadOnlyMappedFiles() {
{
//create old mapped files
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
}
mappedFileQueue.shutdown(1000);
}
// test load and readonly
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/b/");
config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c");
MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
mappedFileQueue.load();
assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024);
for (int i = 0; i < 1024; i++) {
assertThat(mappedFileQueue.mappedFiles.get(i).getFile().getName())
.isEqualTo(UtilAll.offset2FileName(1024 * i));
}
mappedFileQueue.destroy();
}
@Test
public void testUpdatePathsOnline() {
final byte[] fixedMsg = new byte[1024];
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, null);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
int idx = i % storePaths.length;
assertThat(mappedFile.getFileName().startsWith(storePaths[idx])).isTrue();
if (i == 500) {
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/");
storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
}
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testFullStorePath() {
final byte[] fixedMsg = new byte[1024];
Set<String> fullStorePath = new HashSet<>();
MessageStoreConfig config = new MessageStoreConfig();
config.setStorePathCommitLog("target/unit_test_store/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/b/" + MessageStoreConfig.MULTI_PATH_SPLITTER
+ "target/unit_test_store/c/");
MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null, () -> fullStorePath);
String[] storePaths = config.getStorePathCommitLog().trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
assertThat(storePaths.length).isEqualTo(3);
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length);
assertThat(mappedFile.getFileName().startsWith(storePaths[1])).isTrue();
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 2);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue();
fullStorePath.add("target/unit_test_store/b/");
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 3);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[2])).isTrue();
mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * 4);
assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
assertThat(mappedFile.getFileName().startsWith(storePaths[0])).isTrue();
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ScheduleMessageServiceTest {
private Random random = new Random();
@Test
public void testCorrectDelayOffset_whenInit() throws Exception {
ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = null;
ScheduleMessageService scheduleMessageService = new ScheduleMessageService((DefaultMessageStore) buildMessageStore());
scheduleMessageService.parseDelayLevel();
ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable1 = new ConcurrentHashMap<>();
for (int i = 1; i <= 18; i++) {
offsetTable1.put(i, random.nextLong());
}
Field field = scheduleMessageService.getClass().getDeclaredField("offsetTable");
field.setAccessible(true);
field.set(scheduleMessageService, offsetTable1);
String jsonStr = scheduleMessageService.encode();
scheduleMessageService.decode(jsonStr);
offsetTable = (ConcurrentMap<Integer, Long>) field.get(scheduleMessageService);
for (Map.Entry<Integer, Long> entry : offsetTable.entrySet()) {
assertEquals(entry.getValue(), offsetTable1.get(entry.getKey()));
}
scheduleMessageService.correctDelayOffset();
offsetTable = (ConcurrentMap<Integer, Long>) field.get(scheduleMessageService);
for (long offset : offsetTable.values()) {
assertEquals(offset, 0);
}
}
private MessageStore buildMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
}
}
......@@ -33,7 +33,6 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
......@@ -43,16 +42,12 @@ import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.MigrateLogicalQueueBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData;
import org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo;
import org.apache.rocketmq.common.protocol.route.MessageQueueRouteState;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
......@@ -507,10 +502,24 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException {
return this.defaultMQAdminExtImpl.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis);
}
/* (non-Javadoc)
......
......@@ -104,6 +104,24 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
private long timeoutMillis = 20000;
private Random random = new Random();
private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
static {
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SELF_TEST_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.ONS_HTTP_PROXY_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PERMISSION_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_OWNER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_ONSAPI_PULL_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CID_SYS_RMQ_TRANS);
}
public DefaultMQAdminExtImpl(DefaultMQAdminExt defaultMQAdminExt, long timeoutMillis) {
this(defaultMQAdminExt, null, timeoutMillis);
}
......@@ -941,7 +959,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
if (brokerData != null) {
String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
String addr = RemotingUtil.convert2IpString(brokerData.getBrokerAddrs().get(MixAll.MASTER_ID));
if (RemotingUtil.socketAddress2String(msg.getStoreHost()).equals(addr)) {
if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
return true;
......@@ -1016,12 +1034,49 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
SubscriptionGroupWrapper subscriptionGroupWrapper = this.mqClientInstance.getMQClientAPIImpl()
.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
Iterator<Entry<String, SubscriptionGroupConfig>> iterator = subscriptionGroupWrapper.getSubscriptionGroupTable()
.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SubscriptionGroupConfig> configEntry = iterator.next();
if (MixAll.isSysConsumerGroup(configEntry.getKey()) || SYSTEM_GROUP_SET.contains(configEntry.getKey())) {
iterator.remove();
}
}
return subscriptionGroupWrapper;
}
@Override
public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
}
@Override
public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = this.getAllTopicConfig(brokerAddr, timeoutMillis);
TopicList topicList = this.mqClientInstance.getMQClientAPIImpl().getSystemTopicListFromBroker(brokerAddr,
timeoutMillis);
Iterator<Entry<String, TopicConfig>> iterator = topicConfigSerializeWrapper.getTopicConfigTable().entrySet()
.iterator();
while (iterator.hasNext()) {
String topic = iterator.next().getKey();
if (topicList.getTopicList().contains(topic) || (!specialTopic && (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)))) {
iterator.remove();
}
}
return topicConfigSerializeWrapper;
}
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
......
......@@ -247,10 +247,18 @@ public interface MQAdminExt extends MQAdmin {
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException;
TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
long timeoutMillis) throws InterruptedException, RemotingException,
MQBrokerException, MQClientException;
void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException;
......
......@@ -146,5 +146,4 @@ public class CommandUtil {
}
throw new Exception(ERROR_MESSAGE);
}
}
......@@ -58,6 +58,8 @@ import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCo
import org.apache.rocketmq.tools.command.logicalqueue.QueryTopicLogicalQueueMappingCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
import org.apache.rocketmq.tools.command.export.ExportConfigsCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
......@@ -80,6 +82,7 @@ import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
import org.apache.rocketmq.tools.command.export.ExportMetadataCommand;
import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicListSubCommand;
import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
......@@ -160,7 +163,6 @@ public class MQAdminStartup {
initCommand(new UpdateTopicSubCommand());
initCommand(new DeleteTopicSubCommand());
initCommand(new UpdateSubGroupSubCommand());
initCommand(new SetConsumeModeSubCommand());
initCommand(new DeleteSubscriptionGroupCommand());
initCommand(new UpdateBrokerConfigSubCommand());
initCommand(new UpdateTopicPermSubCommand());
......@@ -230,6 +232,10 @@ public class MQAdminStartup {
initCommand(new QueryTopicLogicalQueueMappingCommand());
initCommand(new MigrateTopicLogicalQueueCommand());
initCommand(new UpdateTopicLogicalQueueNumCommand());
initCommand(new ExportMetadataCommand());
initCommand(new ExportConfigsCommand());
initCommand(new ExportMetricsCommand());
}
private static void initLogback() throws JoranException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.export;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.alibaba.fastjson.JSON;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ExportConfigsCommand implements SubCommand {
@Override
public String commandName() {
return "exportConfigs";
}
@Override
public String commandDesc() {
return "export configs";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "filePath", true,
"export configs.json path | default /tmp/rocketmq/export");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String clusterName = commandLine.getOptionValue('c').trim();
String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f')
.trim();
defaultMQAdminExt.start();
Map<String, Object> result = new HashMap<>();
// name servers
List<String> nameServerAddressList = defaultMQAdminExt.getNameServerAddressList();
//broker
int masterBrokerSize = 0;
int slaveBrokerSize = 0;
Map<String, Properties> brokerConfigs = new HashMap<>();
Map<String, List<String>> masterAndSlaveMap
= CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
for (String masterAddr : masterAndSlaveMap.keySet()) {
Properties masterProperties = defaultMQAdminExt.getBrokerConfig(masterAddr);
masterBrokerSize++;
slaveBrokerSize += masterAndSlaveMap.get(masterAddr).size();
brokerConfigs.put(masterProperties.getProperty("brokerName"), needBrokerProprties(masterProperties));
}
Map<String, Integer> clusterScaleMap = new HashMap<>();
clusterScaleMap.put("namesrvSize", nameServerAddressList.size());
clusterScaleMap.put("masterBrokerSize", masterBrokerSize);
clusterScaleMap.put("slaveBrokerSize", slaveBrokerSize);
result.put("brokerConfigs", brokerConfigs);
result.put("clusterScale", clusterScaleMap);
String path = filePath + "/configs.json";
MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path);
System.out.printf("export %s success", path);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
private Properties needBrokerProprties(Properties properties) {
Properties newProperties = new Properties();
newProperties.setProperty("brokerClusterName", properties.getProperty("brokerClusterName"));
newProperties.setProperty("brokerId", properties.getProperty("brokerId"));
newProperties.setProperty("brokerName", properties.getProperty("brokerName"));
newProperties.setProperty("brokerRole", properties.getProperty("brokerRole"));
newProperties.setProperty("fileReservedTime", properties.getProperty("fileReservedTime"));
newProperties.setProperty("filterServerNums", properties.getProperty("filterServerNums"));
newProperties.setProperty("flushDiskType", properties.getProperty("flushDiskType"));
newProperties.setProperty("maxMessageSize", properties.getProperty("maxMessageSize"));
newProperties.setProperty("messageDelayLevel", properties.getProperty("messageDelayLevel"));
newProperties.setProperty("msgTraceTopicName", properties.getProperty("msgTraceTopicName"));
newProperties.setProperty("slaveReadEnable", properties.getProperty("slaveReadEnable"));
newProperties.setProperty("traceOn", properties.getProperty("traceOn"));
newProperties.setProperty("traceTopicEnable", properties.getProperty("traceTopicEnable"));
newProperties.setProperty("useTLS", properties.getProperty("useTLS"));
newProperties.setProperty("autoCreateTopicEnable", properties.getProperty("autoCreateTopicEnable"));
newProperties.setProperty("autoCreateSubscriptionGroup", properties.getProperty("autoCreateSubscriptionGroup"));
return newProperties;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.export;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.fastjson.JSON;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ExportMetadataCommand implements SubCommand {
private static final String DEFAULT_FILE_PATH = "/tmp/rocketmq/export";
@Override
public String commandName() {
return "exportMetadata";
}
@Override
public String commandDesc() {
return "export metadata";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("b", "brokerAddr", true, "choose a broker to export");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("f", "filePath", true, "export metadata.json path | default /tmp/rocketmq/export");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topic", false, "only export topic metadata");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("g", "subscriptionGroup", false, "only export subscriptionGroup metadata");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "specialTopic", false, "need retryTopic and dlqTopic");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
defaultMQAdminExt.start();
String filePath = !commandLine.hasOption('f') ? DEFAULT_FILE_PATH : commandLine.getOptionValue('f')
.trim();
boolean specialTopic = commandLine.hasOption('s');
if (commandLine.hasOption('b')) {
final String brokerAddr = commandLine.getOptionValue('b').trim();
if (commandLine.hasOption('t')) {
filePath = filePath + "/topic.json";
TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
brokerAddr, specialTopic, 10000L);
MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath);
System.out.printf("export %s success", filePath);
} else if (commandLine.hasOption('g')) {
filePath = filePath + "/subscriptionGroup.json";
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(
brokerAddr, 10000L);
MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath);
System.out.printf("export %s success", filePath);
}
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Map<String, TopicConfig> topicConfigMap = new HashMap<>();
Map<String, SubscriptionGroupConfig> subGroupConfigMap = new HashMap<>();
for (String addr : masterSet) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
addr, specialTopic, 10000L);
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(
addr, 10000);
if (commandLine.hasOption('t')) {
filePath = filePath + "/topic.json";
MixAll.string2FileNotSafe(JSON.toJSONString(topicConfigSerializeWrapper, true), filePath);
System.out.printf("export %s success", filePath);
return;
} else if (commandLine.hasOption('g')) {
filePath = filePath + "/subscriptionGroup.json";
MixAll.string2FileNotSafe(JSON.toJSONString(subscriptionGroupWrapper, true), filePath);
System.out.printf("export %s success", filePath);
return;
} else {
for (Map.Entry<String, TopicConfig> entry : topicConfigSerializeWrapper.getTopicConfigTable().entrySet()) {
TopicConfig topicConfig = topicConfigMap.get(entry.getKey());
if (null != topicConfig) {
entry.getValue().setWriteQueueNums(
topicConfig.getWriteQueueNums() + entry.getValue().getWriteQueueNums());
entry.getValue().setReadQueueNums(
topicConfig.getReadQueueNums() + entry.getValue().getReadQueueNums());
}
topicConfigMap.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SubscriptionGroupConfig> entry : subscriptionGroupWrapper.getSubscriptionGroupTable().entrySet()) {
SubscriptionGroupConfig subscriptionGroupConfig = subGroupConfigMap.get(entry.getKey());
if (null != subscriptionGroupConfig) {
entry.getValue().setRetryQueueNums(
subscriptionGroupConfig.getRetryQueueNums() + entry.getValue().getRetryQueueNums());
}
subGroupConfigMap.put(entry.getKey(), entry.getValue());
}
Map<String, Object> result = new HashMap<>();
result.put("topicConfigTable", topicConfigMap);
result.put("subscriptionGroupTable", subGroupConfigMap);
result.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
result.put("exportTime", System.currentTimeMillis());
filePath = filePath + "/metadata.json";
MixAll.string2FileNotSafe(JSON.toJSONString(result, true), filePath);
System.out.printf("export %s success", filePath);
}
}
} else {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.export;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.alibaba.fastjson.JSON;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
public class ExportMetricsCommand implements SubCommand {
@Override
public String commandName() {
return "exportMetrics";
}
@Override
public String commandDesc() {
return "export metrics";
}
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("c", "clusterName", true, "choose a cluster to export");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("f", "filePath", true,
"export metrics.json path | default /tmp/rocketmq/export");
opt.setRequired(false);
options.addOption(opt);
return options;
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String clusterName = commandLine.getOptionValue('c').trim();
String filePath = !commandLine.hasOption('f') ? "/tmp/rocketmq/export" : commandLine.getOptionValue('f')
.trim();
defaultMQAdminExt.start();
Map<String, Map<String, Map<String, Object>>> evaluateReportMap = new HashMap<>();
Map<String, Double> totalTpsMap = new HashMap<>();
Map<String, Long> totalOneDayNumMap = new HashMap<>();
initTotalMap(totalTpsMap, totalOneDayNumMap);
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
for (String brokerName : brokerNameSet) {
BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
if (brokerData != null) {
String addr = brokerData.getBrokerAddrs().get(0L);
KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(addr);
Properties properties = defaultMQAdminExt.getBrokerConfig(addr);
SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getUserSubscriptionGroup(addr,
10000);
Map<String, Map<String, Object>> brokerInfo = new HashMap<>();
//broker environment,machine configuration
brokerInfo.put("runtimeEnv", getRuntimeEnv(kvTable, properties));
brokerInfo.put("runtimeQuota",
getRuntimeQuota(kvTable, defaultMQAdminExt, addr, totalTpsMap,
totalOneDayNumMap, subscriptionGroupWrapper));
// runtime version
brokerInfo.put("runtimeVersion",
getRuntimeVersion(defaultMQAdminExt, subscriptionGroupWrapper));
evaluateReportMap.put(brokerName, brokerInfo);
}
}
String path = filePath + "/metrics.json";
Map<String, Object> totalData = new HashMap<>();
totalData.put("totalTps", totalTpsMap);
totalData.put("totalOneDayNum", totalOneDayNumMap);
Map<String, Object> result = new HashMap<>();
result.put("evaluateReport", evaluateReportMap);
result.put("totalData", totalData);
MixAll.string2FileNotSafe(JSON.toJSONString(result, true), path);
System.out.printf("export %s success", path);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
private Map<String, Object> getRuntimeVersion(DefaultMQAdminExt defaultMQAdminExt,
SubscriptionGroupWrapper subscriptionGroupWrapper) {
Map<String, Object> runtimeVersionMap = new HashMap();
Set<String> clientInfoSet = new HashSet<>();
for (Map.Entry<String, SubscriptionGroupConfig> entry : subscriptionGroupWrapper
.getSubscriptionGroupTable().entrySet()) {
try {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(
entry.getValue().getGroupName());
for (Connection conn : cc.getConnectionSet()) {
String clientInfo = conn.getLanguage() + "%" + MQVersion.getVersionDesc(conn.getVersion());
clientInfoSet.add(clientInfo);
}
} catch (Exception e) {
continue;
}
}
runtimeVersionMap.put("rocketmqVersion", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
runtimeVersionMap.put("clientInfo", clientInfoSet);
return runtimeVersionMap;
}
private Map<String, Object> getRuntimeEnv(KVTable kvTable, Properties properties) {
Map<String, Object> runtimeEnvMap = new HashMap<>();
runtimeEnvMap.put("cpuNum", properties.getProperty("clientCallbackExecutorThreads"));
runtimeEnvMap.put("totalMemKBytes", kvTable.getTable().get("totalMemKBytes"));
return runtimeEnvMap;
}
private Map<String, Object> getRuntimeQuota(KVTable kvTable, DefaultMQAdminExt defaultMQAdminExt, String brokerAddr,
Map<String, Double> totalTpsMap, Map<String, Long> totalOneDayNumMap,
SubscriptionGroupWrapper subscriptionGroupWrapper)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = defaultMQAdminExt.getUserTopicConfig(
brokerAddr, false, 10000);
BrokerStatsData transStatsData = null;
try {
transStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr,
BrokerStatsManager.TOPIC_PUT_NUMS,
TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC);
} catch (MQClientException e) {
}
BrokerStatsData scheduleStatsData = null;
try {
scheduleStatsData = defaultMQAdminExt.viewBrokerStatsData(brokerAddr,
BrokerStatsManager.TOPIC_PUT_NUMS, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
} catch (MQClientException e) {
}
Map<String, Object> runtimeQuotaMap = new HashMap<>();
//disk use ratio
Map<String, Object> diskRatioMap = new HashMap<>();
diskRatioMap.put("commitLogDiskRatio", kvTable.getTable().get("commitLogDiskRatio"));
diskRatioMap.put("consumeQueueDiskRatio", kvTable.getTable().get("consumeQueueDiskRatio"));
runtimeQuotaMap.put("diskRatio", diskRatioMap);
//inTps and outTps
Map<String, Double> tpsMap = new HashMap<>();
double normalInTps = 0;
double normalOutTps = 0;
String putTps = kvTable.getTable().get("putTps");
String getTransferedTps = kvTable.getTable().get("getTransferedTps");
String[] inTpss = putTps.split(" ");
if (inTpss.length > 0) {
normalInTps = Double.parseDouble(inTpss[0]);
}
String[] outTpss = getTransferedTps.split(" ");
if (outTpss.length > 0) {
normalOutTps = Double.parseDouble(outTpss[0]);
}
double transInTps = null != transStatsData ? transStatsData.getStatsMinute().getTps() : 0.0;
double scheduleInTps = null != scheduleStatsData ? scheduleStatsData.getStatsMinute().getTps() : 0.0;
long transOneDayInNum = null != transStatsData ? StatsAllSubCommand.compute24HourSum(transStatsData) : 0;
long scheduleOneDayInNum = null != scheduleStatsData ? StatsAllSubCommand.compute24HourSum(scheduleStatsData) : 0;
//current minute tps
tpsMap.put("normalInTps", normalInTps);
tpsMap.put("normalOutTps", normalOutTps);
tpsMap.put("transInTps", transInTps);
tpsMap.put("scheduleInTps", scheduleInTps);
runtimeQuotaMap.put("tps", tpsMap);
//one day num
Map<String, Long> oneDayNumMap = new HashMap<>();
long normalOneDayInNum = Long.parseLong(kvTable.getTable().get("msgPutTotalTodayMorning")) -
Long.parseLong(kvTable.getTable().get("msgPutTotalYesterdayMorning"));
long normalOneDayOutNum = Long.parseLong(kvTable.getTable().get("msgGetTotalTodayMorning")) -
Long.parseLong(kvTable.getTable().get("msgGetTotalYesterdayMorning"));
oneDayNumMap.put("normalOneDayInNum", normalOneDayInNum);
oneDayNumMap.put("normalOneDayOutNum", normalOneDayOutNum);
oneDayNumMap.put("transOneDayInNum", transOneDayInNum);
oneDayNumMap.put("scheduleOneDayInNum", scheduleOneDayInNum);
runtimeQuotaMap.put("oneDayNum", oneDayNumMap);
//all broker current minute tps
totalTpsMap.put("totalNormalInTps", totalTpsMap.get("totalNormalInTps") + normalInTps);
totalTpsMap.put("totalNormalOutTps", totalTpsMap.get("totalNormalOutTps") + normalOutTps);
totalTpsMap.put("totalTransInTps", totalTpsMap.get("totalTransInTps") + transInTps);
totalTpsMap.put("totalScheduleInTps", totalTpsMap.get("totalScheduleInTps") + scheduleInTps);
//all broker one day num
totalOneDayNumMap.put("normalOneDayInNum", totalOneDayNumMap.get("normalOneDayInNum") + normalOneDayInNum);
totalOneDayNumMap.put("normalOneDayOutNum", totalOneDayNumMap.get("normalOneDayOutNum") + normalOneDayOutNum);
totalOneDayNumMap.put("transOneDayInNum", totalOneDayNumMap.get("transOneDayInNum") + transOneDayInNum);
totalOneDayNumMap.put("scheduleOneDayInNum", totalOneDayNumMap.get("scheduleOneDayInNum") + scheduleOneDayInNum);
// putMessageAverageSize
runtimeQuotaMap.put("messageAverageSize", kvTable.getTable().get("putMessageAverageSize"));
//topicSize
runtimeQuotaMap.put("topicSize", topicConfigSerializeWrapper.getTopicConfigTable().size());
runtimeQuotaMap.put("groupSize", subscriptionGroupWrapper.getSubscriptionGroupTable().size());
return runtimeQuotaMap;
}
private void initTotalMap(Map<String, Double> totalTpsMap, Map<String, Long> totalOneDayNumMap) {
totalTpsMap.put("totalNormalInTps", 0.0);
totalTpsMap.put("totalNormalOutTps", 0.0);
totalTpsMap.put("totalTransInTps", 0.0);
totalTpsMap.put("totalScheduleInTps", 0.0);
totalOneDayNumMap.put("normalOneDayInNum", 0L);
totalOneDayNumMap.put("normalOneDayOutNum", 0L);
totalOneDayNumMap.put("transOneDayInNum", 0L);
totalOneDayNumMap.put("scheduleOneDayInNum", 0L);
}
}
......@@ -35,6 +35,7 @@ import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
......@@ -55,6 +56,7 @@ import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
......@@ -249,6 +251,14 @@ public class DefaultMQAdminExtTest {
consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<String, TopicConfig>() {
{
put("topic_test_examine_topicConfig", new TopicConfig("topic_test_examine_topicConfig"));
}
});
when(mQClientAPIImpl.getAllTopicConfig(anyString(),anyLong())).thenReturn(topicConfigSerializeWrapper);
}
@AfterClass
......@@ -464,4 +474,10 @@ public class DefaultMQAdminExtTest {
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(topic1, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, 0), System.currentTimeMillis())).isEqualTo(1011L);
}
@Test
public void testExamineTopicConfig() throws MQBrokerException, RemotingException, InterruptedException {
TopicConfig topicConfig = defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", "topic_test_examine_topicConfig");
assertThat(topicConfig.getTopicName().equals("topic_test_examine_topicConfig"));
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册