From 04e97de8a74b346b8d9e386b18c34e28ea2514fe Mon Sep 17 00:00:00 2001 From: yukon Date: Tue, 16 Jan 2018 21:33:12 +0800 Subject: [PATCH] [ROCKETMQ-332] Fix concurrent bug in MappedFileQueue#findMappedFileByOffset, which may cause message loss --- .../rocketmq/store/MappedFileQueue.java | 38 ++++++++++++------- .../rocketmq/store/MappedFileQueueTest.java | 18 +++++++++ 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index 9eb3b3ab..bdb851ba 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -461,26 +461,38 @@ public class MappedFileQueue { */ public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { - MappedFile mappedFile = this.getFirstMappedFile(); - if (mappedFile != null) { - int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); + MappedFile firstMappedFile = this.getFirstMappedFile(); + if (firstMappedFile != null) { + int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); if (index < 0 || index >= this.mappedFiles.size()) { - LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " + - "mappedFileSize: {}, mappedFiles count: {}", - mappedFile, + LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}", + firstMappedFile, offset, index, this.mappedFileSize, this.mappedFiles.size()); - } + } else { + MappedFile targetFile = null; + try { + targetFile = this.mappedFiles.get(index); + } catch (Exception ignored) { + } - try { - return this.mappedFiles.get(index); - } catch (Exception e) { - if (returnFirstOnNotFound) { - return mappedFile; + if (targetFile != null && offset >= targetFile.getFileFromOffset() + && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { + return targetFile; } - LOG_ERROR.warn("findMappedFileByOffset failure. ", e); + + for (MappedFile tmpMappedFile : this.mappedFiles) { + if (offset >= tmpMappedFile.getFileFromOffset() + && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { + return tmpMappedFile; + } + } + } + + if (returnFirstOnNotFound) { + return firstMappedFile; } } } catch (Exception e) { diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java index 92f1876b..8f76051d 100644 --- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java @@ -229,6 +229,24 @@ public class MappedFileQueueTest { assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45); } + @Test + public void testFindMappedFile_ByIteration() { + MappedFileQueue mappedFileQueue = + new MappedFileQueue("target/unit_test_store/g/", 1024, null); + for (int i =0 ; i < 3; i++) { + MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024 * i); + mappedFile.wrotePosition.set(1024); + } + + assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024); + + // Switch two MappedFiles and verify findMappedFileByOffset method + MappedFile tmpFile = mappedFileQueue.getMappedFiles().get(1); + mappedFileQueue.getMappedFiles().set(1, mappedFileQueue.getMappedFiles().get(2)); + mappedFileQueue.getMappedFiles().set(2, tmpFile); + assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024); + } + @After public void destory() { File file = new File("target/unit_test_store"); -- GitLab