From d1fa8694ff82b429dcd811156edf7ea8a702237e Mon Sep 17 00:00:00 2001 From: shtykh_roman Date: Mon, 26 Dec 2016 18:32:00 +0900 Subject: [PATCH] [ROCKETMQ-9] Errors in rocketmq-store module. JIRA issue: https://issues.apache.org/jira/browse/ROCKETMQ-9 --- .../store/AllocateMappedFileService.java | 2 +- .../rocketmq/store/MappedFileQueue.java | 26 ++++++++++------ .../rocketmq/store/index/IndexFile.java | 5 ++- .../rocketmq/store/index/IndexService.java | 31 ++++++++++--------- .../rocketmq/store/MappedFileQueueTest.java | 17 ++++++++-- .../rocketmq/store/index/IndexFileTest.java | 18 ++++++----- .../src/test/resources/logback-test.xml | 2 +- 7 files changed, 63 insertions(+), 38 deletions(-) diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java index 40eee7a6..06113c82 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java @@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread { isSuccess = true; } } catch (InterruptedException e) { - log.warn(this.getServiceName() + " service has exception, maybe by shutdown"); + log.warn(this.getServiceName() + " interrupted, possibly by shutdown."); this.hasException = true; return false; } catch (IOException e) { diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java index 2b006c0e..8d9d3ab0 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java @@ -459,27 +459,35 @@ public class MappedFileQueue { return result; } - + /** + * Finds a mapped file by offset. + * + * @param offset Offset. + * @param returnFirstOnNotFound If the mapped file is not found, then return the first one. + * @return Mapped file or null (when not found and returnFirstOnNotFound is false). + */ public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { MappedFile mappedFile = this.getFirstMappedFile(); if (mappedFile != null) { int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize)); if (index < 0 || index >= this.mappedFiles.size()) { - LOG_ERROR.warn("findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}", - offset, - index, - this.mappedFileSize, - this.mappedFiles.size(), - UtilAll.currentStackTrace()); + LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " + + "mappedFileSize: {}, mappedFiles count: {}", + mappedFile, + offset, + index, + this.mappedFileSize, + this.mappedFiles.size()); } try { return this.mappedFiles.get(index); } catch (Exception e) { - if (returnFirstOnNotFound) { + if (returnFirstOnNotFound) return mappedFile; - } + + LOG_ERROR.warn("findMappedFileByOffset failure. {}", UtilAll.currentStackTrace()); } } } catch (Exception e) { diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java index f3533201..befa5f9a 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java @@ -94,7 +94,6 @@ public class IndexFile { return this.indexHeader.getIndexCount() >= this.indexNum; } - public boolean destroy(final long intervalForcibly) { return this.mappedFile.destroy(intervalForcibly); } @@ -167,8 +166,8 @@ public class IndexFile { } } } else { - log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num " - + this.indexNum); + log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + + "; index max num = " + this.indexNum); } return false; diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java index f275f808..fded7473 100644 --- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java +++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java @@ -49,6 +49,9 @@ public class IndexService { private final ArrayList indexFileList = new ArrayList(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + /** Maximum times to attempt index file creation. */ + private static final int MAX_TRY_IDX_CREATE = 3; + public IndexService(final DefaultMessageStore store) { this.defaultMessageStore = store; @@ -257,44 +260,44 @@ public class IndexService { private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) { - for (boolean ok = - indexFile.putKey(idxKey, msg.getCommitLogOffset(), - msg.getStoreTimestamp()); !ok; ) { - log.warn("index file full, so create another one, " + indexFile.getFileName()); + for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) { + log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one"); + indexFile = retryGetAndCreateIndexFile(); if (null == indexFile) { return null; } - ok = - indexFile.putKey(idxKey, msg.getCommitLogOffset(), - msg.getStoreTimestamp()); + ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); } + return indexFile; } - - public IndexFile retryGetAndCreateIndexFile() { + /** + * Retries to get or create index file. + * + * @return {@link IndexFile} or null on failure. + */ + private IndexFile retryGetAndCreateIndexFile() { IndexFile indexFile = null; - - for (int times = 0; null == indexFile && times < 3; times++) { + for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) { indexFile = this.getAndCreateLastIndexFile(); if (null != indexFile) break; try { - log.error("try to create index file, " + times + " times"); + log.error("Tried to create index file " + times + " times"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } - if (null == indexFile) { this.defaultMessageStore.getAccessRights().makeIndexFileError(); - log.error("mark index file can not build flag"); + log.error("Mark index file cannot build flag"); } return indexFile; diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java index 89b37be1..700f1c60 100644 --- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java @@ -52,6 +52,7 @@ public class MappedFileQueueTest { @Test public void test_getLastMapedFile() { final String fixedMsg = "0123456789abcdef"; + logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/a/", 1024, null); @@ -59,6 +60,7 @@ public class MappedFileQueueTest { for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); if (!result) { logger.debug("appendMessage " + i); @@ -74,7 +76,9 @@ public class MappedFileQueueTest { @Test public void test_findMapedFileByOffset() { + // four-byte string. final String fixedMsg = "abcd"; + logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/b/", 1024, null); @@ -82,11 +86,13 @@ public class MappedFileQueueTest { for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); - // logger.debug("appendMessage " + bytes); assertTrue(result); } + assertEquals(fixedMsg.getBytes().length * 1024, mappedFileQueue.getMappedMemorySize()); + MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 0); @@ -110,7 +116,8 @@ public class MappedFileQueueTest { mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100); assertTrue(mappedFile != null); assertEquals(mappedFile.getFileFromOffset(), 1024 * 2); - + + // over mapped memory size. mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4); assertTrue(mappedFile == null); @@ -125,6 +132,7 @@ public class MappedFileQueueTest { @Test public void test_commit() { final String fixedMsg = "0123456789abcdef"; + logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/c/", 1024, null); @@ -132,6 +140,7 @@ public class MappedFileQueueTest { for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); assertTrue(result); } @@ -168,6 +177,7 @@ public class MappedFileQueueTest { @Test public void test_getMapedMemorySize() { final String fixedMsg = "abcd"; + logger.debug("================================================================"); MappedFileQueue mappedFileQueue = new MappedFileQueue("target/unit_test_store/d/", 1024, null); @@ -175,14 +185,15 @@ public class MappedFileQueueTest { for (int i = 0; i < 1024; i++) { MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0); assertTrue(mappedFile != null); + boolean result = mappedFile.appendMessage(fixedMsg.getBytes()); assertTrue(result); } assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize()); + mappedFileQueue.shutdown(1000); mappedFileQueue.destroy(); logger.debug("MappedFileQueue.getMappedMemorySize() OK"); } - } diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java index f6bfc0a6..9e446f7c 100644 --- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java +++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java @@ -31,17 +31,18 @@ import static org.junit.Assert.assertTrue; public class IndexFileTest { - private static final int hashSlotNum = 100; - private static final int indexNum = 400; + private static final int HASH_SLOT_NUM = 100; + private static final int INDEX_NUM = 400; @Test public void test_put_index() throws Exception { - IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0); - for (long i = 0; i < (indexNum - 1); i++) { + IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0); + for (long i = 0; i < (INDEX_NUM - 1); i++) { boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); assertTrue(putResult); } - + + // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); assertFalse(putResult); @@ -51,12 +52,14 @@ public class IndexFileTest { @Test public void test_put_get_index() throws Exception { - IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0); + IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0); - for (long i = 0; i < (indexNum - 1); i++) { + for (long i = 0; i < (INDEX_NUM - 1); i++) { boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis()); assertTrue(putResult); } + + // put over index file capacity. boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis()); assertFalse(putResult); @@ -64,6 +67,7 @@ public class IndexFileTest { indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true); assertFalse(phyOffsets.isEmpty()); assertEquals(1, phyOffsets.size()); + indexFile.destroy(0); } } diff --git a/rocketmq-store/src/test/resources/logback-test.xml b/rocketmq-store/src/test/resources/logback-test.xml index acdfa10b..11d429df 100644 --- a/rocketmq-store/src/test/resources/logback-test.xml +++ b/rocketmq-store/src/test/resources/logback-test.xml @@ -27,7 +27,7 @@ - + -- GitLab