未验证 提交 b391594b 编写于 作者: Y yukon 提交者: GitHub

Merge pull request #220 from zhouxinyu/ROCKETMQ-332

[ROCKETMQ-332] MappedFileQueue#findMappedFileByOffset is not thread safe, which will cause message loss.
...@@ -461,26 +461,39 @@ public class MappedFileQueue { ...@@ -461,26 +461,39 @@ public class MappedFileQueue {
*/ */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try { try {
MappedFile mappedFile = this.getFirstMappedFile(); MappedFile firstMappedFile = this.getFirstMappedFile();
if (mappedFile != null) { MappedFile lastMappedFile = this.getLastMappedFile();
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); if (firstMappedFile != null && lastMappedFile != null) {
if (index < 0 || index >= this.mappedFiles.size()) { if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " + LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
"mappedFileSize: {}, mappedFiles count: {}",
mappedFile,
offset, offset,
index, firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize, this.mappedFileSize,
this.mappedFiles.size()); this.mappedFiles.size());
} else {
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
} }
try { if (targetFile != null && offset >= targetFile.getFileFromOffset()
return this.mappedFiles.get(index); && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
} catch (Exception e) { return targetFile;
if (returnFirstOnNotFound) { }
return mappedFile;
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
} }
LOG_ERROR.warn("findMappedFileByOffset failure. ", e); }
if (returnFirstOnNotFound) {
return firstMappedFile;
} }
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -229,6 +229,24 @@ public class MappedFileQueueTest { ...@@ -229,6 +229,24 @@ public class MappedFileQueueTest {
assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45); assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45);
} }
@Test
public void testFindMappedFile_ByIteration() {
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/g/", 1024, null);
for (int i =0 ; i < 3; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024 * i);
mappedFile.wrotePosition.set(1024);
}
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
// Switch two MappedFiles and verify findMappedFileByOffset method
MappedFile tmpFile = mappedFileQueue.getMappedFiles().get(1);
mappedFileQueue.getMappedFiles().set(1, mappedFileQueue.getMappedFiles().get(2));
mappedFileQueue.getMappedFiles().set(2, tmpFile);
assertThat(mappedFileQueue.findMappedFileByOffset(1028).getFileFromOffset()).isEqualTo(1024);
}
@After @After
public void destory() { public void destory() {
File file = new File("target/unit_test_store"); File file = new File("target/unit_test_store");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册