diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
index 40eee7a660390999ef5ff9557e310a3aa5986de4..06113c82c1095be21e2f8c8cbe92e262b26dc02d 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
@@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread {
isSuccess = true;
}
} catch (InterruptedException e) {
- log.warn(this.getServiceName() + " service has exception, maybe by shutdown");
+ log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
index 2b006c0e533d044ed0bd57cb070f9c6e1724a205..8d9d3ab0b4a7707d4b751cc55ad1869eca05067e 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
@@ -459,27 +459,35 @@ public class MappedFileQueue {
return result;
}
-
+ /**
+ * Finds a mapped file by offset.
+ *
+ * @param offset Offset.
+ * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
+ * @return Mapped file or null (when not found and returnFirstOnNotFound is false
).
+ */
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
if (index < 0 || index >= this.mappedFiles.size()) {
- LOG_ERROR.warn("findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}",
- offset,
- index,
- this.mappedFileSize,
- this.mappedFiles.size(),
- UtilAll.currentStackTrace());
+ LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
+ "mappedFileSize: {}, mappedFiles count: {}",
+ mappedFile,
+ offset,
+ index,
+ this.mappedFileSize,
+ this.mappedFiles.size());
}
try {
return this.mappedFiles.get(index);
} catch (Exception e) {
- if (returnFirstOnNotFound) {
+ if (returnFirstOnNotFound)
return mappedFile;
- }
+
+ LOG_ERROR.warn("findMappedFileByOffset failure. {}", UtilAll.currentStackTrace());
}
}
} catch (Exception e) {
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
index f35332014eb5da3ea806331b3c6015123e703b11..befa5f9aa82507e7e72dbd8f3a76fe38dfb67232 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
@@ -94,7 +94,6 @@ public class IndexFile {
return this.indexHeader.getIndexCount() >= this.indexNum;
}
-
public boolean destroy(final long intervalForcibly) {
return this.mappedFile.destroy(intervalForcibly);
}
@@ -167,8 +166,8 @@ public class IndexFile {
}
}
} else {
- log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
- + this.indexNum);
+ log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ + "; index max num = " + this.indexNum);
}
return false;
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
index f275f80875da980a34cfff3863519c4efe7a7e90..fded74734c29abfb6854ccc4562ac6586a2fc4bf 100644
--- a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -49,6 +49,9 @@ public class IndexService {
private final ArrayList indexFileList = new ArrayList();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ /** Maximum times to attempt index file creation. */
+ private static final int MAX_TRY_IDX_CREATE = 3;
+
public IndexService(final DefaultMessageStore store) {
this.defaultMessageStore = store;
@@ -257,44 +260,44 @@ public class IndexService {
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
- for (boolean ok =
- indexFile.putKey(idxKey, msg.getCommitLogOffset(),
- msg.getStoreTimestamp()); !ok; ) {
- log.warn("index file full, so create another one, " + indexFile.getFileName());
+ for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
+ log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
+
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
- ok =
- indexFile.putKey(idxKey, msg.getCommitLogOffset(),
- msg.getStoreTimestamp());
+ ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
+
return indexFile;
}
-
- public IndexFile retryGetAndCreateIndexFile() {
+ /**
+ * Retries to get or create index file.
+ *
+ * @return {@link IndexFile} or null on failure.
+ */
+ private IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
-
- for (int times = 0; null == indexFile && times < 3; times++) {
+ for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;
try {
- log.error("try to create index file, " + times + " times");
+ log.error("Tried to create index file " + times + " times");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
-
if (null == indexFile) {
this.defaultMessageStore.getAccessRights().makeIndexFileError();
- log.error("mark index file can not build flag");
+ log.error("Mark index file cannot build flag");
}
return indexFile;
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
index 89b37be1019d2c4fc82ee11ac376e4b46f8cb4ac..700f1c604eb4e46ea7ae10beb49329f20858a00d 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
@@ -52,6 +52,7 @@ public class MappedFileQueueTest {
@Test
public void test_getLastMapedFile() {
final String fixedMsg = "0123456789abcdef";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/a/", 1024, null);
@@ -59,6 +60,7 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
if (!result) {
logger.debug("appendMessage " + i);
@@ -74,7 +76,9 @@ public class MappedFileQueueTest {
@Test
public void test_findMapedFileByOffset() {
+ // four-byte string.
final String fixedMsg = "abcd";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/b/", 1024, null);
@@ -82,11 +86,13 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
- // logger.debug("appendMessage " + bytes);
assertTrue(result);
}
+ assertEquals(fixedMsg.getBytes().length * 1024, mappedFileQueue.getMappedMemorySize());
+
MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
@@ -110,7 +116,8 @@ public class MappedFileQueueTest {
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
-
+
+ // over mapped memory size.
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
assertTrue(mappedFile == null);
@@ -125,6 +132,7 @@ public class MappedFileQueueTest {
@Test
public void test_commit() {
final String fixedMsg = "0123456789abcdef";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/c/", 1024, null);
@@ -132,6 +140,7 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
assertTrue(result);
}
@@ -168,6 +177,7 @@ public class MappedFileQueueTest {
@Test
public void test_getMapedMemorySize() {
final String fixedMsg = "abcd";
+
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/d/", 1024, null);
@@ -175,14 +185,15 @@ public class MappedFileQueueTest {
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
+
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
assertTrue(result);
}
assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
+
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.getMappedMemorySize() OK");
}
-
}
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
index f6bfc0a660a4d6634fcfab81c166272e411fc0ca..9e446f7c1a1c9fef09a9d7787bfa439da18856ca 100644
--- a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
@@ -31,17 +31,18 @@ import static org.junit.Assert.assertTrue;
public class IndexFileTest {
- private static final int hashSlotNum = 100;
- private static final int indexNum = 400;
+ private static final int HASH_SLOT_NUM = 100;
+ private static final int INDEX_NUM = 400;
@Test
public void test_put_index() throws Exception {
- IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
- for (long i = 0; i < (indexNum - 1); i++) {
+ IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
+ for (long i = 0; i < (INDEX_NUM - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
-
+
+ // put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
@@ -51,12 +52,14 @@ public class IndexFileTest {
@Test
public void test_put_get_index() throws Exception {
- IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
+ IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
- for (long i = 0; i < (indexNum - 1); i++) {
+ for (long i = 0; i < (INDEX_NUM - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
}
+
+ // put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
@@ -64,6 +67,7 @@ public class IndexFileTest {
indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
assertFalse(phyOffsets.isEmpty());
assertEquals(1, phyOffsets.size());
+
indexFile.destroy(0);
}
}
diff --git a/rocketmq-store/src/test/resources/logback-test.xml b/rocketmq-store/src/test/resources/logback-test.xml
index acdfa10b2677bc48a8f6501b805d1269e544bf5b..11d429dfd8c80be79bf3c1811e0e9df96c65404e 100644
--- a/rocketmq-store/src/test/resources/logback-test.xml
+++ b/rocketmq-store/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
-
+