Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
d1fa8694
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
d1fa8694
编写于
12月 26, 2016
作者:
S
shtykh_roman
提交者:
Willem Jiang
12月 27, 2016
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ROCKETMQ-9] Errors in rocketmq-store module.
JIRA issue:
https://issues.apache.org/jira/browse/ROCKETMQ-9
上级
fed09763
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
63 addition
and
38 deletion
+63
-38
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
...com/alibaba/rocketmq/store/AllocateMappedFileService.java
+1
-1
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
...main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
+17
-9
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
...main/java/com/alibaba/rocketmq/store/index/IndexFile.java
+2
-3
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
...n/java/com/alibaba/rocketmq/store/index/IndexService.java
+17
-14
rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
.../java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
+14
-3
rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
.../java/com/alibaba/rocketmq/store/index/IndexFileTest.java
+11
-7
rocketmq-store/src/test/resources/logback-test.xml
rocketmq-store/src/test/resources/logback-test.xml
+1
-1
未找到文件。
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/AllocateMappedFileService.java
浏览文件 @
d1fa8694
...
...
@@ -213,7 +213,7 @@ public class AllocateMappedFileService extends ServiceThread {
isSuccess
=
true
;
}
}
catch
(
InterruptedException
e
)
{
log
.
warn
(
this
.
getServiceName
()
+
"
service has exception, maybe by shutdown
"
);
log
.
warn
(
this
.
getServiceName
()
+
"
interrupted, possibly by shutdown.
"
);
this
.
hasException
=
true
;
return
false
;
}
catch
(
IOException
e
)
{
...
...
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/MappedFileQueue.java
浏览文件 @
d1fa8694
...
...
@@ -459,27 +459,35 @@ public class MappedFileQueue {
return
result
;
}
/**
* Finds a mapped file by offset.
*
* @param offset Offset.
* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
*/
public
MappedFile
findMappedFileByOffset
(
final
long
offset
,
final
boolean
returnFirstOnNotFound
)
{
try
{
MappedFile
mappedFile
=
this
.
getFirstMappedFile
();
if
(
mappedFile
!=
null
)
{
int
index
=
(
int
)
((
offset
/
this
.
mappedFileSize
)
-
(
mappedFile
.
getFileFromOffset
()
/
this
.
mappedFileSize
));
if
(
index
<
0
||
index
>=
this
.
mappedFiles
.
size
())
{
LOG_ERROR
.
warn
(
"findMappedFileByOffset offset not matched, request Offset: {}, index: {}, mappedFileSize: {}, mappedFiles count: {}, StackTrace: {}"
,
LOG_ERROR
.
warn
(
"Offset for {} not matched. Request offset: {}, index: {}, "
+
"mappedFileSize: {}, mappedFiles count: {}"
,
mappedFile
,
offset
,
index
,
this
.
mappedFileSize
,
this
.
mappedFiles
.
size
(),
UtilAll
.
currentStackTrace
());
this
.
mappedFiles
.
size
());
}
try
{
return
this
.
mappedFiles
.
get
(
index
);
}
catch
(
Exception
e
)
{
if
(
returnFirstOnNotFound
)
{
if
(
returnFirstOnNotFound
)
return
mappedFile
;
}
LOG_ERROR
.
warn
(
"findMappedFileByOffset failure. {}"
,
UtilAll
.
currentStackTrace
());
}
}
}
catch
(
Exception
e
)
{
...
...
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
浏览文件 @
d1fa8694
...
...
@@ -94,7 +94,6 @@ public class IndexFile {
return
this
.
indexHeader
.
getIndexCount
()
>=
this
.
indexNum
;
}
public
boolean
destroy
(
final
long
intervalForcibly
)
{
return
this
.
mappedFile
.
destroy
(
intervalForcibly
);
}
...
...
@@ -167,8 +166,8 @@ public class IndexFile {
}
}
}
else
{
log
.
warn
(
"
putKey index count "
+
this
.
indexHeader
.
getIndexCount
()
+
" index max num "
+
this
.
indexNum
);
log
.
warn
(
"
Over index file capacity: index count = "
+
this
.
indexHeader
.
getIndexCount
()
+
"; index max num = "
+
this
.
indexNum
);
}
return
false
;
...
...
rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
浏览文件 @
d1fa8694
...
...
@@ -49,6 +49,9 @@ public class IndexService {
private
final
ArrayList
<
IndexFile
>
indexFileList
=
new
ArrayList
<
IndexFile
>();
private
final
ReadWriteLock
readWriteLock
=
new
ReentrantReadWriteLock
();
/** Maximum times to attempt index file creation. */
private
static
final
int
MAX_TRY_IDX_CREATE
=
3
;
public
IndexService
(
final
DefaultMessageStore
store
)
{
this
.
defaultMessageStore
=
store
;
...
...
@@ -257,44 +260,44 @@ public class IndexService {
private
IndexFile
putKey
(
IndexFile
indexFile
,
DispatchRequest
msg
,
String
idxKey
)
{
for
(
boolean
ok
=
indexFile
.
putKey
(
idxKey
,
msg
.
getCommitLogOffset
(),
msg
.
getStoreTimestamp
());
!
ok
;
)
{
log
.
warn
(
"index file full, so create another one, "
+
indexFile
.
getFileName
());
for
(
boolean
ok
=
indexFile
.
putKey
(
idxKey
,
msg
.
getCommitLogOffset
(),
msg
.
getStoreTimestamp
());
!
ok
;
)
{
log
.
warn
(
"Index file ["
+
indexFile
.
getFileName
()
+
"] is full, trying to create another one"
);
indexFile
=
retryGetAndCreateIndexFile
();
if
(
null
==
indexFile
)
{
return
null
;
}
ok
=
indexFile
.
putKey
(
idxKey
,
msg
.
getCommitLogOffset
(),
msg
.
getStoreTimestamp
());
ok
=
indexFile
.
putKey
(
idxKey
,
msg
.
getCommitLogOffset
(),
msg
.
getStoreTimestamp
());
}
return
indexFile
;
}
public
IndexFile
retryGetAndCreateIndexFile
()
{
/**
* Retries to get or create index file.
*
* @return {@link IndexFile} or null on failure.
*/
private
IndexFile
retryGetAndCreateIndexFile
()
{
IndexFile
indexFile
=
null
;
for
(
int
times
=
0
;
null
==
indexFile
&&
times
<
3
;
times
++)
{
for
(
int
times
=
0
;
null
==
indexFile
&&
times
<
MAX_TRY_IDX_CREATE
;
times
++)
{
indexFile
=
this
.
getAndCreateLastIndexFile
();
if
(
null
!=
indexFile
)
break
;
try
{
log
.
error
(
"
try to create index file,
"
+
times
+
" times"
);
log
.
error
(
"
Tried to create index file
"
+
times
+
" times"
);
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
if
(
null
==
indexFile
)
{
this
.
defaultMessageStore
.
getAccessRights
().
makeIndexFileError
();
log
.
error
(
"
mark index file can
not build flag"
);
log
.
error
(
"
Mark index file can
not build flag"
);
}
return
indexFile
;
...
...
rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
浏览文件 @
d1fa8694
...
...
@@ -52,6 +52,7 @@ public class MappedFileQueueTest {
@Test
public
void
test_getLastMapedFile
()
{
final
String
fixedMsg
=
"0123456789abcdef"
;
logger
.
debug
(
"================================================================"
);
MappedFileQueue
mappedFileQueue
=
new
MappedFileQueue
(
"target/unit_test_store/a/"
,
1024
,
null
);
...
...
@@ -59,6 +60,7 @@ public class MappedFileQueueTest {
for
(
int
i
=
0
;
i
<
1024
;
i
++)
{
MappedFile
mappedFile
=
mappedFileQueue
.
getLastMappedFile
(
0
);
assertTrue
(
mappedFile
!=
null
);
boolean
result
=
mappedFile
.
appendMessage
(
fixedMsg
.
getBytes
());
if
(!
result
)
{
logger
.
debug
(
"appendMessage "
+
i
);
...
...
@@ -74,7 +76,9 @@ public class MappedFileQueueTest {
@Test
public
void
test_findMapedFileByOffset
()
{
// four-byte string.
final
String
fixedMsg
=
"abcd"
;
logger
.
debug
(
"================================================================"
);
MappedFileQueue
mappedFileQueue
=
new
MappedFileQueue
(
"target/unit_test_store/b/"
,
1024
,
null
);
...
...
@@ -82,11 +86,13 @@ public class MappedFileQueueTest {
for
(
int
i
=
0
;
i
<
1024
;
i
++)
{
MappedFile
mappedFile
=
mappedFileQueue
.
getLastMappedFile
(
0
);
assertTrue
(
mappedFile
!=
null
);
boolean
result
=
mappedFile
.
appendMessage
(
fixedMsg
.
getBytes
());
// logger.debug("appendMessage " + bytes);
assertTrue
(
result
);
}
assertEquals
(
fixedMsg
.
getBytes
().
length
*
1024
,
mappedFileQueue
.
getMappedMemorySize
());
MappedFile
mappedFile
=
mappedFileQueue
.
findMappedFileByOffset
(
0
);
assertTrue
(
mappedFile
!=
null
);
assertEquals
(
mappedFile
.
getFileFromOffset
(),
0
);
...
...
@@ -111,6 +117,7 @@ public class MappedFileQueueTest {
assertTrue
(
mappedFile
!=
null
);
assertEquals
(
mappedFile
.
getFileFromOffset
(),
1024
*
2
);
// over mapped memory size.
mappedFile
=
mappedFileQueue
.
findMappedFileByOffset
(
1024
*
4
);
assertTrue
(
mappedFile
==
null
);
...
...
@@ -125,6 +132,7 @@ public class MappedFileQueueTest {
@Test
public
void
test_commit
()
{
final
String
fixedMsg
=
"0123456789abcdef"
;
logger
.
debug
(
"================================================================"
);
MappedFileQueue
mappedFileQueue
=
new
MappedFileQueue
(
"target/unit_test_store/c/"
,
1024
,
null
);
...
...
@@ -132,6 +140,7 @@ public class MappedFileQueueTest {
for
(
int
i
=
0
;
i
<
1024
;
i
++)
{
MappedFile
mappedFile
=
mappedFileQueue
.
getLastMappedFile
(
0
);
assertTrue
(
mappedFile
!=
null
);
boolean
result
=
mappedFile
.
appendMessage
(
fixedMsg
.
getBytes
());
assertTrue
(
result
);
}
...
...
@@ -168,6 +177,7 @@ public class MappedFileQueueTest {
@Test
public
void
test_getMapedMemorySize
()
{
final
String
fixedMsg
=
"abcd"
;
logger
.
debug
(
"================================================================"
);
MappedFileQueue
mappedFileQueue
=
new
MappedFileQueue
(
"target/unit_test_store/d/"
,
1024
,
null
);
...
...
@@ -175,14 +185,15 @@ public class MappedFileQueueTest {
for
(
int
i
=
0
;
i
<
1024
;
i
++)
{
MappedFile
mappedFile
=
mappedFileQueue
.
getLastMappedFile
(
0
);
assertTrue
(
mappedFile
!=
null
);
boolean
result
=
mappedFile
.
appendMessage
(
fixedMsg
.
getBytes
());
assertTrue
(
result
);
}
assertEquals
(
fixedMsg
.
length
()
*
1024
,
mappedFileQueue
.
getMappedMemorySize
());
mappedFileQueue
.
shutdown
(
1000
);
mappedFileQueue
.
destroy
();
logger
.
debug
(
"MappedFileQueue.getMappedMemorySize() OK"
);
}
}
rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
浏览文件 @
d1fa8694
...
...
@@ -31,17 +31,18 @@ import static org.junit.Assert.assertTrue;
public
class
IndexFileTest
{
private
static
final
int
hashSlotNum
=
100
;
private
static
final
int
indexNum
=
400
;
private
static
final
int
HASH_SLOT_NUM
=
100
;
private
static
final
int
INDEX_NUM
=
400
;
@Test
public
void
test_put_index
()
throws
Exception
{
IndexFile
indexFile
=
new
IndexFile
(
"100"
,
hashSlotNum
,
indexNum
,
0
,
0
);
for
(
long
i
=
0
;
i
<
(
indexNum
-
1
);
i
++)
{
IndexFile
indexFile
=
new
IndexFile
(
"100"
,
HASH_SLOT_NUM
,
INDEX_NUM
,
0
,
0
);
for
(
long
i
=
0
;
i
<
(
INDEX_NUM
-
1
);
i
++)
{
boolean
putResult
=
indexFile
.
putKey
(
Long
.
toString
(
i
),
i
,
System
.
currentTimeMillis
());
assertTrue
(
putResult
);
}
// put over index file capacity.
boolean
putResult
=
indexFile
.
putKey
(
Long
.
toString
(
400
),
400
,
System
.
currentTimeMillis
());
assertFalse
(
putResult
);
...
...
@@ -51,12 +52,14 @@ public class IndexFileTest {
@Test
public
void
test_put_get_index
()
throws
Exception
{
IndexFile
indexFile
=
new
IndexFile
(
"200"
,
hashSlotNum
,
indexNum
,
0
,
0
);
IndexFile
indexFile
=
new
IndexFile
(
"200"
,
HASH_SLOT_NUM
,
INDEX_NUM
,
0
,
0
);
for
(
long
i
=
0
;
i
<
(
indexNum
-
1
);
i
++)
{
for
(
long
i
=
0
;
i
<
(
INDEX_NUM
-
1
);
i
++)
{
boolean
putResult
=
indexFile
.
putKey
(
Long
.
toString
(
i
),
i
,
System
.
currentTimeMillis
());
assertTrue
(
putResult
);
}
// put over index file capacity.
boolean
putResult
=
indexFile
.
putKey
(
Long
.
toString
(
400
),
400
,
System
.
currentTimeMillis
());
assertFalse
(
putResult
);
...
...
@@ -64,6 +67,7 @@ public class IndexFileTest {
indexFile
.
selectPhyOffset
(
phyOffsets
,
"60"
,
10
,
0
,
Long
.
MAX_VALUE
,
true
);
assertFalse
(
phyOffsets
.
isEmpty
());
assertEquals
(
1
,
phyOffsets
.
size
());
indexFile
.
destroy
(
0
);
}
}
rocketmq-store/src/test/resources/logback-test.xml
浏览文件 @
d1fa8694
...
...
@@ -27,7 +27,7 @@
<appender-ref
ref=
"STDOUT"
/>
</logger>
<root
level=
"
WARN
"
>
<root
level=
"
ERROR
"
>
<appender-ref
ref=
"STDOUT"
/>
</root>
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录