提交 ba9ded4b 编写于 作者: Y yukon

Merge branch 'develop' into enhancedTls

# Conflicts:
#	remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
#	remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -27,6 +28,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and
* {@link BrokerController#pullThreadPoolQueue}
*/
public class BrokerFastFailure {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
......@@ -52,8 +57,10 @@ public class BrokerFastFailure {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}
......@@ -75,10 +82,18 @@ public class BrokerFastFailure {
}
}
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
if (!blockingQueue.isEmpty()) {
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
......@@ -88,10 +103,10 @@ public class BrokerFastFailure {
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class BrokerFastFailureTest {
@Test
public void testCleanExpiredRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isZero();
//Normal Runnable
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
queue.add(runnable);
assertThat(queue.size()).isEqualTo(1);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isEqualTo(1);
queue.clear();
//With expired request
RequestTask expiredRequest = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(expiredRequest, null));
TimeUnit.MILLISECONDS.sleep(100);
RequestTask requestTask = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(requestTask, null));
assertThat(queue.size()).isEqualTo(2);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
assertThat(queue.size()).isEqualTo(1);
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
}
}
\ No newline at end of file
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -711,8 +711,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// consumeThreadMin
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
|| this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
throw new MQClientException(
"consumeThreadMin Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
......@@ -727,6 +726,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
null);
}
// consumeThreadMin can't be larger than consumeThreadMax
if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
throw new MQClientException(
"consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "
+ "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",
null);
}
// consumeConcurrentlyMaxSpan
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
......
......@@ -48,7 +48,10 @@ public class ProcessQueue {
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
......@@ -243,8 +246,8 @@ public class ProcessQueue {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.putAll(this.msgTreeMapTemp);
this.msgTreeMapTemp.clear();
this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
this.consumingMsgOrderlyTreeMap.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
......@@ -257,12 +260,12 @@ public class ProcessQueue {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
for (MessageExt msg : this.msgTreeMapTemp.values()) {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
this.msgTreeMapTemp.clear();
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
......@@ -281,7 +284,7 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
this.msgTreeMapTemp.remove(msg.getQueueOffset());
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
......@@ -304,7 +307,7 @@ public class ProcessQueue {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
msgTreeMapTemp.put(entry.getKey(), entry.getValue());
consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
......@@ -343,7 +346,7 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.clear();
this.msgTreeMapTemp.clear();
this.consumingMsgOrderlyTreeMap.clear();
this.msgCount.set(0);
this.msgSize.set(0);
this.queueOffsetMax = 0L;
......@@ -402,10 +405,10 @@ public class ProcessQueue {
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));
}
if (!this.msgTreeMapTemp.isEmpty()) {
info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
info.setTransactionMsgCount(this.msgTreeMapTemp.size());
if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());
info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey());
info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size());
}
info.setLocked(this.locked);
......
......@@ -975,7 +975,6 @@ public class MQClientInstance {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
FOR_SEG:
for (Map.Entry<Long, String> entry : map.entrySet()) {
Long id = entry.getKey();
brokerAddr = entry.getValue();
......@@ -983,7 +982,6 @@ public class MQClientInstance {
found = true;
if (MixAll.MASTER_ID == id) {
slave = false;
break FOR_SEG;
} else {
slave = true;
}
......
......@@ -454,9 +454,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (tmpmq != null) {
mq = tmpmq;
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class DefaultMQPushConsumerImplTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void checkConfigTest() throws MQClientException {
//test type
thrown.expect(MQClientException.class);
//test message
thrown.expectMessage("consumeThreadMin (10) is larger than consumeThreadMax (9)");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(9);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(" Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null);
defaultMQPushConsumerImpl.start();
}
}
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -106,7 +106,9 @@ public class BrokerConfig {
private boolean disableConsumeIfConsumerReadSlowly = false;
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
......@@ -163,6 +165,22 @@ public class BrokerConfig {
this.consumerFallbehindThreshold = consumerFallbehindThreshold;
}
public boolean isBrokerFastFailureEnable() {
return brokerFastFailureEnable;
}
public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) {
this.brokerFastFailureEnable = brokerFastFailureEnable;
}
public long getWaitTimeMillsInPullQueue() {
return waitTimeMillsInPullQueue;
}
public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) {
this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;
}
public boolean isDisableConsumeIfConsumerReadSlowly() {
return disableConsumeIfConsumerReadSlowly;
}
......
......@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -51,7 +51,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -32,7 +32,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
......@@ -49,15 +48,6 @@ public class NamesrvStartup {
public static NamesrvController main0(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 4096;
}
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 4096;
}
try {
//PackageConflictDetect.detectFastjson();
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -29,7 +29,7 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
......@@ -39,9 +39,9 @@
</prerequisites>
<scm>
<url>https://github.com/apache/rocketmq.git</url>
<connection>scm:git:https://github.com/apache/rocketmq.git</connection>
<developerConnection>scm:git:https://github.com/apache/rocketmq.git</developerConnection>
<url>git@github.com:apache/rocketmq.git</url>
<connection>scm:git:git@github.com:apache/rocketmq.git</connection>
<developerConnection>scm:git:git@github.com:apache/rocketmq.git</developerConnection>
<tag>HEAD</tag>
</scm>
......@@ -252,7 +252,8 @@
<exclude>.travis.yml</exclude>
<exclude>CONTRIBUTING.md</exclude>
<exclude>bin/README.md</exclude>
<exclude>PULL_REQUEST_TEMPLATE.md</exclude>
<exclude>.github/*</exclude>
<exclude>src/test/resources/certs/*</exclude>
</excludes>
</configuration>
</plugin>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -110,6 +110,8 @@ public class DefaultMessageStore implements MessageStore {
private FileLock lock;
boolean shutDownNormal = false;
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
......@@ -265,8 +267,9 @@ public class DefaultMessageStore implements MessageStore {
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
if (this.runningFlags.isWriteable()) {
if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
shutDownNormal = true;
} else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
......
......@@ -570,6 +570,11 @@ public class MappedFile extends ReferenceResource {
log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
//testable
File getFile() {
return this.file;
}
@Override
public String toString() {
return this.fileName;
......
......@@ -367,6 +367,9 @@ public class MappedFileQueue {
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
......@@ -421,7 +424,7 @@ public class MappedFileQueue {
public boolean flush(final int flushLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
......@@ -438,7 +441,7 @@ public class MappedFileQueue {
public boolean commit(final int commitLeastPages) {
boolean result = true;
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.io.File;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMessageStoreShuwDownTest {
private DefaultMessageStore messageStore;
@Before
public void init() throws Exception {
messageStore = spy(buildMessageStore());
boolean load = messageStore.load();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
assertTrue(load);
messageStore.start();
}
@Test
public void testDispatchBehindWhenShutDown() {
messageStore.shutdown();
assertTrue(!messageStore.shutDownNormal);
File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
assertTrue(file.exists());
}
@After
public void destory() {
messageStore.destroy();
File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
UtilAll.deleteFile(file);
}
public DefaultMessageStore buildMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
}
}
......@@ -20,9 +20,7 @@ package org.apache.rocketmq.store;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Test;
......@@ -47,7 +45,7 @@ public class MappedFileQueueTest {
}
@Test
public void test_findMappedFileByOffset() {
public void testFindMappedFileByOffset() {
// four-byte string.
final String fixedMsg = "abcd";
......@@ -97,6 +95,28 @@ public class MappedFileQueueTest {
mappedFileQueue.destroy();
}
@Test
public void testFindMappedFileByOffset_StartOffsetIsNonZero() {
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/b/", 1024, null);
//Start from a non-zero offset
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024);
assertThat(mappedFile).isNotNull();
assertThat(mappedFileQueue.findMappedFileByOffset(1025)).isEqualTo(mappedFile);
assertThat(mappedFileQueue.findMappedFileByOffset(0)).isNull();
assertThat(mappedFileQueue.findMappedFileByOffset(123, false)).isNull();
assertThat(mappedFileQueue.findMappedFileByOffset(123, true)).isEqualTo(mappedFile);
assertThat(mappedFileQueue.findMappedFileByOffset(0, false)).isNull();
assertThat(mappedFileQueue.findMappedFileByOffset(0, true)).isEqualTo(mappedFile);
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
}
@Test
public void testAppendMessage() {
final String fixedMsg = "0123456789abcdef";
......@@ -182,6 +202,33 @@ public class MappedFileQueueTest {
mappedFileQueue.destroy();
}
@Test
public void testDeleteExpiredFileByTime() throws Exception {
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/f/", 1024, null);
for (int i = 0; i < 100; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertThat(mappedFile).isNotNull();
byte[] bytes = new byte[512];
assertThat(mappedFile.appendMessage(bytes)).isTrue();
}
assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(50);
long expiredTime = 100 * 1000;
for (int i = 0; i < mappedFileQueue.getMappedFiles().size(); i++) {
MappedFile mappedFile = mappedFileQueue.getMappedFiles().get(i);
if (i < 5) {
mappedFile.getFile().setLastModified(System.currentTimeMillis() - expiredTime * 2);
}
if (i > 20) {
mappedFile.getFile().setLastModified(System.currentTimeMillis() - expiredTime * 2);
}
}
mappedFileQueue.deleteExpiredFileByTime(expiredTime, 0, 0, false);
assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45);
}
@After
public void destory() {
File file = new File("target/unit_test_store");
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0-SNAPSHOT</version>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册