提交 c6741374 编写于 作者: Z Zhendong Liu 提交者: von gosling

[ROCKETMQ-320]Message loss when shutdown with dispatch behind (#197)

上级 54592158
......@@ -110,6 +110,8 @@ public class DefaultMessageStore implements MessageStore {
private FileLock lock;
boolean shutDownNormal = false;
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
......@@ -265,8 +267,9 @@ public class DefaultMessageStore implements MessageStore {
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
if (this.runningFlags.isWriteable()) {
if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
shutDownNormal = true;
} else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import java.io.File;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMessageStoreShuwDownTest {
private DefaultMessageStore messageStore;
@Before
public void init() throws Exception {
messageStore = spy(buildMessageStore());
boolean load = messageStore.load();
when(messageStore.dispatchBehindBytes()).thenReturn(100L);
assertTrue(load);
messageStore.start();
}
@Test
public void testDispatchBehindWhenShutDown() {
messageStore.shutdown();
assertTrue(!messageStore.shutDownNormal);
File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
assertTrue(file.exists());
}
@After
public void destory() {
messageStore.destroy();
File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
UtilAll.deleteFile(file);
}
public DefaultMessageStore buildMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册