Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
da73a1d1
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 3 年多
通知
275
Star
16140
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
da73a1d1
编写于
10月 19, 2018
作者:
S
suiyuzeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
log recover issue
上级
a220364b
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
288 addition
and
151 deletion
+288
-151
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+102
-90
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+68
-61
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
...va/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+118
-0
未找到文件。
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
da73a1d1
...
...
@@ -23,6 +23,7 @@ import java.util.List;
import
java.util.Map
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.common.ServiceThread
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
...
...
@@ -65,7 +66,7 @@ public class CommitLog {
public
CommitLog
(
final
DefaultMessageStore
defaultMessageStore
)
{
this
.
mappedFileQueue
=
new
MappedFileQueue
(
defaultMessageStore
.
getMessageStoreConfig
().
getStorePathCommitLog
(),
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
(),
defaultMessageStore
.
getAllocateMappedFileService
());
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
(),
defaultMessageStore
.
getAllocateMappedFileService
());
this
.
defaultMessageStore
=
defaultMessageStore
;
if
(
FlushDiskType
.
SYNC_FLUSH
==
defaultMessageStore
.
getMessageStoreConfig
().
getFlushDiskType
())
{
...
...
@@ -128,10 +129,10 @@ public class CommitLog {
}
public
int
deleteExpiredFile
(
final
long
expiredTime
,
final
int
deleteFilesInterval
,
final
long
intervalForcibly
,
final
boolean
cleanImmediately
final
long
expiredTime
,
final
int
deleteFilesInterval
,
final
long
intervalForcibly
,
final
boolean
cleanImmediately
)
{
return
this
.
mappedFileQueue
.
deleteExpiredFileByTime
(
expiredTime
,
deleteFilesInterval
,
intervalForcibly
,
cleanImmediately
);
}
...
...
@@ -158,7 +159,7 @@ public class CommitLog {
/**
* When the normal exit, data recovery, all memory data have been flush
*/
public
void
recoverNormally
()
{
public
void
recoverNormally
(
long
maxPhyOffsetOfConsumeQueue
)
{
boolean
checkCRCOnRecover
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isCheckCRCOnRecover
();
final
List
<
MappedFile
>
mappedFiles
=
this
.
mappedFileQueue
.
getMappedFiles
();
if
(!
mappedFiles
.
isEmpty
())
{
...
...
@@ -171,6 +172,7 @@ public class CommitLog {
ByteBuffer
byteBuffer
=
mappedFile
.
sliceByteBuffer
();
long
processOffset
=
mappedFile
.
getFileFromOffset
();
long
mappedFileOffset
=
0
;
boolean
isDamaged
=
false
;
while
(
true
)
{
DispatchRequest
dispatchRequest
=
this
.
checkMessageAndReturnSize
(
byteBuffer
,
checkCRCOnRecover
);
int
size
=
dispatchRequest
.
getMsgSize
();
...
...
@@ -198,6 +200,7 @@ public class CommitLog {
// Intermediate file read error
else
if
(!
dispatchRequest
.
isSuccess
())
{
log
.
info
(
"recover physics file end, "
+
mappedFile
.
getFileName
());
isDamaged
=
true
;
break
;
}
}
...
...
@@ -206,6 +209,12 @@ public class CommitLog {
this
.
mappedFileQueue
.
setFlushedWhere
(
processOffset
);
this
.
mappedFileQueue
.
setCommittedWhere
(
processOffset
);
this
.
mappedFileQueue
.
truncateDirtyFiles
(
processOffset
);
// Clear ConsumeQueue redundant data
if
(
isDamaged
&&
maxPhyOffsetOfConsumeQueue
>=
processOffset
)
{
log
.
warn
(
"maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files"
,
maxPhyOffsetOfConsumeQueue
,
processOffset
);
this
.
defaultMessageStore
.
truncateDirtyLogicFiles
(
processOffset
);
}
}
}
...
...
@@ -225,7 +234,7 @@ public class CommitLog {
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
public
DispatchRequest
checkMessageAndReturnSize
(
java
.
nio
.
ByteBuffer
byteBuffer
,
final
boolean
checkCRC
,
final
boolean
readBody
)
{
final
boolean
readBody
)
{
try
{
// 1 TOTAL SIZE
int
totalSize
=
byteBuffer
.
getInt
();
...
...
@@ -321,7 +330,7 @@ public class CommitLog {
if
(
delayLevel
>
0
)
{
tagsCode
=
this
.
defaultMessageStore
.
getScheduleMessageService
().
computeDeliverTimestamp
(
delayLevel
,
storeTimestamp
);
storeTimestamp
);
}
}
}
...
...
@@ -335,24 +344,24 @@ public class CommitLog {
doNothingForDeadCode
(
byteBuffer1
);
doNothingForDeadCode
(
byteBuffer2
);
log
.
error
(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}"
,
totalSize
,
readLength
,
bodyLen
,
topicLen
,
propertiesLength
);
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}"
,
totalSize
,
readLength
,
bodyLen
,
topicLen
,
propertiesLength
);
return
new
DispatchRequest
(
totalSize
,
false
/* success */
);
}
return
new
DispatchRequest
(
topic
,
queueId
,
physicOffset
,
totalSize
,
tagsCode
,
storeTimestamp
,
queueOffset
,
keys
,
uniqKey
,
sysFlag
,
preparedTransactionOffset
,
propertiesMap
topic
,
queueId
,
physicOffset
,
totalSize
,
tagsCode
,
storeTimestamp
,
queueOffset
,
keys
,
uniqKey
,
sysFlag
,
preparedTransactionOffset
,
propertiesMap
);
}
catch
(
Exception
e
)
{
}
...
...
@@ -362,23 +371,23 @@ public class CommitLog {
private
static
int
calMsgLength
(
int
bodyLength
,
int
topicLength
,
int
propertiesLength
)
{
final
int
msgLen
=
4
//TOTALSIZE
+
4
//MAGICCODE
+
4
//BODYCRC
+
4
//QUEUEID
+
4
//FLAG
+
8
//QUEUEOFFSET
+
8
//PHYSICALOFFSET
+
4
//SYSFLAG
+
8
//BORNTIMESTAMP
+
8
//BORNHOST
+
8
//STORETIMESTAMP
+
8
//STOREHOSTADDRESS
+
4
//RECONSUMETIMES
+
8
//Prepared Transaction Offset
+
4
+
(
bodyLength
>
0
?
bodyLength
:
0
)
//BODY
+
1
+
topicLength
//TOPIC
+
2
+
(
propertiesLength
>
0
?
propertiesLength
:
0
)
//propertiesLength
+
0
;
+
4
//MAGICCODE
+
4
//BODYCRC
+
4
//QUEUEID
+
4
//FLAG
+
8
//QUEUEOFFSET
+
8
//PHYSICALOFFSET
+
4
//SYSFLAG
+
8
//BORNTIMESTAMP
+
8
//BORNHOST
+
8
//STORETIMESTAMP
+
8
//STOREHOSTADDRESS
+
4
//RECONSUMETIMES
+
8
//Prepared Transaction Offset
+
4
+
(
bodyLength
>
0
?
bodyLength
:
0
)
//BODY
+
1
+
topicLength
//TOPIC
+
2
+
(
propertiesLength
>
0
?
propertiesLength
:
0
)
//propertiesLength
+
0
;
return
msgLen
;
}
...
...
@@ -390,7 +399,7 @@ public class CommitLog {
this
.
confirmOffset
=
phyOffset
;
}
public
void
recoverAbnormally
()
{
public
void
recoverAbnormally
(
long
maxPhyOffsetOfConsumeQueue
)
{
// recover by the minimum time stamp
boolean
checkCRCOnRecover
=
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isCheckCRCOnRecover
();
final
List
<
MappedFile
>
mappedFiles
=
this
.
mappedFileQueue
.
getMappedFiles
();
...
...
@@ -418,41 +427,41 @@ public class CommitLog {
DispatchRequest
dispatchRequest
=
this
.
checkMessageAndReturnSize
(
byteBuffer
,
checkCRCOnRecover
);
int
size
=
dispatchRequest
.
getMsgSize
();
// Normal data
if
(
size
>
0
)
{
mappedFileOffset
+=
size
;
if
(
dispatchRequest
.
isSuccess
())
{
// Normal data
if
(
size
>
0
)
{
mappedFileOffset
+=
size
;
if
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isDuplicationEnable
())
{
if
(
dispatchRequest
.
getCommitLogOffset
()
<
this
.
defaultMessageStore
.
getConfirmOffset
())
{
if
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isDuplicationEnable
())
{
if
(
dispatchRequest
.
getCommitLogOffset
()
<
this
.
defaultMessageStore
.
getConfirmOffset
())
{
this
.
defaultMessageStore
.
doDispatch
(
dispatchRequest
);
}
}
else
{
this
.
defaultMessageStore
.
doDispatch
(
dispatchRequest
);
}
}
else
{
this
.
defaultMessageStore
.
doDispatch
(
dispatchRequest
);
}
}
// Intermediate file read error
else
if
(
size
==
-
1
)
{
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else
if
(
size
==
0
)
{
index
++;
if
(
index
>=
mappedFiles
.
size
())
{
// The current branch under normal circumstances should
// not happen
log
.
info
(
"recover physics file over, last mapped file "
+
mappedFile
.
getFileName
());
break
;
}
else
{
mappedFile
=
mappedFiles
.
get
(
index
);
byteBuffer
=
mappedFile
.
sliceByteBuffer
();
processOffset
=
mappedFile
.
getFileFromOffset
();
mappedFileOffset
=
0
;
log
.
info
(
"recover next physics file, "
+
mappedFile
.
getFileName
());
}
}
}
else
{
log
.
info
(
"recover physics file end, "
+
mappedFile
.
getFileName
());
break
;
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else
if
(
size
==
0
)
{
index
++;
if
(
index
>=
mappedFiles
.
size
())
{
// The current branch under normal circumstances should
// not happen
log
.
info
(
"recover physics file over, last mapped file "
+
mappedFile
.
getFileName
());
break
;
}
else
{
mappedFile
=
mappedFiles
.
get
(
index
);
byteBuffer
=
mappedFile
.
sliceByteBuffer
();
processOffset
=
mappedFile
.
getFileFromOffset
();
mappedFileOffset
=
0
;
log
.
info
(
"recover next physics file, "
+
mappedFile
.
getFileName
());
}
}
}
processOffset
+=
mappedFileOffset
;
...
...
@@ -461,7 +470,10 @@ public class CommitLog {
this
.
mappedFileQueue
.
truncateDirtyFiles
(
processOffset
);
// Clear ConsumeQueue redundant data
this
.
defaultMessageStore
.
truncateDirtyLogicFiles
(
processOffset
);
if
(
maxPhyOffsetOfConsumeQueue
>=
processOffset
)
{
log
.
warn
(
"maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files"
,
maxPhyOffsetOfConsumeQueue
,
processOffset
);
this
.
defaultMessageStore
.
truncateDirtyLogicFiles
(
processOffset
);
}
}
// Commitlog case files are deleted
else
{
...
...
@@ -485,18 +497,18 @@ public class CommitLog {
}
if
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isMessageIndexEnable
()
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isMessageIndexSafe
())
{
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isMessageIndexSafe
())
{
if
(
storeTimestamp
<=
this
.
defaultMessageStore
.
getStoreCheckpoint
().
getMinTimestampIndex
())
{
log
.
info
(
"find check timestamp, {} {}"
,
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
return
true
;
}
}
else
{
if
(
storeTimestamp
<=
this
.
defaultMessageStore
.
getStoreCheckpoint
().
getMinTimestamp
())
{
log
.
info
(
"find check timestamp, {} {}"
,
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
return
true
;
}
}
...
...
@@ -532,7 +544,7 @@ public class CommitLog {
final
int
tranType
=
MessageSysFlag
.
getTransactionValue
(
msg
.
getSysFlag
());
if
(
tranType
==
MessageSysFlag
.
TRANSACTION_NOT_TYPE
||
tranType
==
MessageSysFlag
.
TRANSACTION_COMMIT_TYPE
)
{
||
tranType
==
MessageSysFlag
.
TRANSACTION_COMMIT_TYPE
)
{
// Delay Delivery
if
(
msg
.
getDelayTimeLevel
()
>
0
)
{
if
(
msg
.
getDelayTimeLevel
()
>
this
.
defaultMessageStore
.
getScheduleMessageService
().
getMaxDelayLevel
())
{
...
...
@@ -638,7 +650,7 @@ public class CommitLog {
boolean
flushOK
=
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
if
(!
flushOK
)
{
log
.
error
(
"do groupcommit, wait for flush failed, topic: "
+
messageExt
.
getTopic
()
+
" tags: "
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostString
());
+
" client address: "
+
messageExt
.
getBornHostString
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_DISK_TIMEOUT
);
}
}
else
{
...
...
@@ -665,10 +677,10 @@ public class CommitLog {
service
.
putRequest
(
request
);
service
.
getWaitNotifyObject
().
wakeupAll
();
boolean
flushOK
=
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
if
(!
flushOK
)
{
log
.
error
(
"do sync transfer other node, wait return, but failed, topic: "
+
messageExt
.
getTopic
()
+
" tags: "
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostNameString
());
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostNameString
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_SLAVE_TIMEOUT
);
}
}
...
...
@@ -905,7 +917,7 @@ public class CommitLog {
int
commitDataLeastPages
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getCommitCommitLogLeastPages
();
int
commitDataThoroughInterval
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getCommitCommitLogThoroughInterval
();
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getCommitCommitLogThoroughInterval
();
long
begin
=
System
.
currentTimeMillis
();
if
(
begin
>=
(
this
.
lastCommitTimestamp
+
commitDataThoroughInterval
))
{
...
...
@@ -954,7 +966,7 @@ public class CommitLog {
int
flushPhysicQueueLeastPages
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushCommitLogLeastPages
();
int
flushPhysicQueueThoroughInterval
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushCommitLogThoroughInterval
();
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushCommitLogThoroughInterval
();
boolean
printFlushProgress
=
false
;
...
...
@@ -1175,7 +1187,7 @@ public class CommitLog {
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBrokerInner
msgInner
)
{
final
MessageExtBrokerInner
msgInner
)
{
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
...
...
@@ -1215,7 +1227,7 @@ public class CommitLog {
* Serialize message
*/
final
byte
[]
propertiesData
=
msgInner
.
getPropertiesString
()
==
null
?
null
:
msgInner
.
getPropertiesString
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
msgInner
.
getPropertiesString
()
==
null
?
null
:
msgInner
.
getPropertiesString
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
int
propertiesLength
=
propertiesData
==
null
?
0
:
propertiesData
.
length
;
...
...
@@ -1234,7 +1246,7 @@ public class CommitLog {
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLength
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
MESSAGE_SIZE_EXCEEDED
);
}
...
...
@@ -1250,7 +1262,7 @@ public class CommitLog {
final
long
beginTimeMills
=
CommitLog
.
this
.
defaultMessageStore
.
now
();
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
maxBlank
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgId
,
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
}
// Initialization of storage space
...
...
@@ -1303,7 +1315,7 @@ public class CommitLog {
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
msgLen
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
msgLen
,
msgId
,
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
switch
(
tranType
)
{
case
MessageSysFlag
.
TRANSACTION_PREPARED_TYPE
:
...
...
@@ -1321,7 +1333,7 @@ public class CommitLog {
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBatch
messageExtBatch
)
{
final
MessageExtBatch
messageExtBatch
)
{
byteBuffer
.
mark
();
//physical offset
long
wroteOffset
=
fileFromOffset
+
byteBuffer
.
position
();
...
...
@@ -1353,7 +1365,7 @@ public class CommitLog {
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLen
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
MESSAGE_SIZE_EXCEEDED
);
}
totalMsgLen
+=
msgLen
;
...
...
@@ -1371,7 +1383,7 @@ public class CommitLog {
byteBuffer
.
reset
();
//ignore the previous appended messages
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
8
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgIdBuilder
.
toString
(),
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
}
//move to add queue offset and commitlog offset
messagesByteBuff
.
position
(
msgPos
+
20
);
...
...
@@ -1395,7 +1407,7 @@ public class CommitLog {
byteBuffer
.
put
(
messagesByteBuff
);
messageExtBatch
.
setEncodedBuff
(
null
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
totalMsgLen
,
msgIdBuilder
.
toString
(),
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
result
.
setMsgNum
(
msgNum
);
CommitLog
.
this
.
topicQueueTable
.
put
(
key
,
queueOffset
);
...
...
@@ -1454,7 +1466,7 @@ public class CommitLog {
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLen
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
throw
new
RuntimeException
(
"message size exceeded"
);
}
...
...
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
da73a1d1
...
...
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.ServiceThread
;
...
...
@@ -93,7 +94,7 @@ public class DefaultMessageStore implements MessageStore {
private
final
SystemClock
systemClock
=
new
SystemClock
();
private
final
ScheduledExecutorService
scheduledExecutorService
=
Executors
.
newSingleThreadScheduledExecutor
(
new
ThreadFactoryImpl
(
"StoreScheduledThread"
));
Executors
.
newSingleThreadScheduledExecutor
(
new
ThreadFactoryImpl
(
"StoreScheduledThread"
));
private
final
BrokerStatsManager
brokerStatsManager
;
private
final
MessageArrivingListener
messageArrivingListener
;
private
final
BrokerConfig
brokerConfig
;
...
...
@@ -113,7 +114,7 @@ public class DefaultMessageStore implements MessageStore {
boolean
shutDownNormal
=
false
;
public
DefaultMessageStore
(
final
MessageStoreConfig
messageStoreConfig
,
final
BrokerStatsManager
brokerStatsManager
,
final
MessageArrivingListener
messageArrivingListener
,
final
BrokerConfig
brokerConfig
)
throws
IOException
{
final
MessageArrivingListener
messageArrivingListener
,
final
BrokerConfig
brokerConfig
)
throws
IOException
{
this
.
messageArrivingListener
=
messageArrivingListener
;
this
.
brokerConfig
=
brokerConfig
;
this
.
messageStoreConfig
=
messageStoreConfig
;
...
...
@@ -184,7 +185,7 @@ public class DefaultMessageStore implements MessageStore {
if
(
result
)
{
this
.
storeCheckpoint
=
new
StoreCheckpoint
(
StorePathConfigHelper
.
getStoreCheckpoint
(
this
.
messageStoreConfig
.
getStorePathRootDir
()));
new
StoreCheckpoint
(
StorePathConfigHelper
.
getStoreCheckpoint
(
this
.
messageStoreConfig
.
getStorePathRootDir
()));
this
.
indexService
.
load
(
lastExitOK
);
...
...
@@ -437,8 +438,8 @@ public class DefaultMessageStore implements MessageStore {
}
public
GetMessageResult
getMessage
(
final
String
group
,
final
String
topic
,
final
int
queueId
,
final
long
offset
,
final
int
maxMsgNums
,
final
MessageFilter
messageFilter
)
{
final
int
maxMsgNums
,
final
MessageFilter
messageFilter
)
{
if
(
this
.
shutdown
)
{
log
.
warn
(
"message store has shutdown, so getMessage is forbidden"
);
return
null
;
...
...
@@ -509,7 +510,7 @@ public class DefaultMessageStore implements MessageStore {
boolean
isInDisk
=
checkInDiskByCommitOffset
(
offsetPy
,
maxOffsetPy
);
if
(
this
.
isTheBatchFull
(
sizePy
,
maxMsgNums
,
getResult
.
getBufferTotalSize
(),
getResult
.
getMessageCount
(),
isInDisk
))
{
isInDisk
))
{
break
;
}
...
...
@@ -521,13 +522,13 @@ public class DefaultMessageStore implements MessageStore {
}
else
{
// can't find ext content.Client will filter messages by tag also.
log
.
error
(
"[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}"
,
tagsCode
,
offsetPy
,
sizePy
,
topic
,
group
);
tagsCode
,
offsetPy
,
sizePy
,
topic
,
group
);
isTagsCodeLegal
=
false
;
}
}
if
(
messageFilter
!=
null
&&
!
messageFilter
.
isMatchedByConsumeQueue
(
isTagsCodeLegal
?
tagsCode
:
null
,
extRet
?
cqExtUnit
:
null
))
{
&&
!
messageFilter
.
isMatchedByConsumeQueue
(
isTagsCodeLegal
?
tagsCode
:
null
,
extRet
?
cqExtUnit
:
null
))
{
if
(
getResult
.
getBufferTotalSize
()
==
0
)
{
status
=
GetMessageStatus
.
NO_MATCHED_MESSAGE
;
}
...
...
@@ -546,7 +547,7 @@ public class DefaultMessageStore implements MessageStore {
}
if
(
messageFilter
!=
null
&&
!
messageFilter
.
isMatchedByCommitLog
(
selectResult
.
getByteBuffer
().
slice
(),
null
))
{
&&
!
messageFilter
.
isMatchedByCommitLog
(
selectResult
.
getByteBuffer
().
slice
(),
null
))
{
if
(
getResult
.
getBufferTotalSize
()
==
0
)
{
status
=
GetMessageStatus
.
NO_MATCHED_MESSAGE
;
}
...
...
@@ -570,7 +571,7 @@ public class DefaultMessageStore implements MessageStore {
long
diff
=
maxOffsetPy
-
maxPhyOffsetPulling
;
long
memory
=
(
long
)
(
StoreUtil
.
TOTAL_PHYSICAL_MEMORY_SIZE
*
(
this
.
messageStoreConfig
.
getAccessMessageInMemoryMaxRatio
()
/
100.0
));
*
(
this
.
messageStoreConfig
.
getAccessMessageInMemoryMaxRatio
()
/
100.0
));
getResult
.
setSuggestPullingFromSlave
(
diff
>
memory
);
}
finally
{
...
...
@@ -580,7 +581,7 @@ public class DefaultMessageStore implements MessageStore {
status
=
GetMessageStatus
.
OFFSET_FOUND_NULL
;
nextBeginOffset
=
nextOffsetCorrection
(
offset
,
consumeQueue
.
rollNextFile
(
offset
));
log
.
warn
(
"consumer request topic: "
+
topic
+
"offset: "
+
offset
+
" minOffset: "
+
minOffset
+
" maxOffset: "
+
maxOffset
+
", but access logic queue failed."
);
+
maxOffset
+
", but access logic queue failed."
);
}
}
}
else
{
...
...
@@ -910,8 +911,8 @@ public class DefaultMessageStore implements MessageStore {
for
(
ConsumeQueue
cq
:
queueTable
.
values
())
{
cq
.
destroy
();
log
.
info
(
"cleanUnusedTopic: {} {} ConsumeQueue cleaned"
,
cq
.
getTopic
(),
cq
.
getQueueId
()
cq
.
getTopic
(),
cq
.
getQueueId
()
);
this
.
commitLog
.
removeQueueFromTopicQueueTable
(
cq
.
getTopic
(),
cq
.
getQueueId
());
...
...
@@ -941,20 +942,20 @@ public class DefaultMessageStore implements MessageStore {
if
(
maxCLOffsetInConsumeQueue
==
-
1
)
{
log
.
warn
(
"maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}."
,
nextQT
.
getValue
().
getTopic
(),
nextQT
.
getValue
().
getQueueId
(),
nextQT
.
getValue
().
getMaxPhysicOffset
(),
nextQT
.
getValue
().
getMinLogicOffset
());
nextQT
.
getValue
().
getTopic
(),
nextQT
.
getValue
().
getQueueId
(),
nextQT
.
getValue
().
getMaxPhysicOffset
(),
nextQT
.
getValue
().
getMinLogicOffset
());
}
else
if
(
maxCLOffsetInConsumeQueue
<
minCommitLogOffset
)
{
log
.
info
(
"cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}"
,
topic
,
nextQT
.
getKey
(),
minCommitLogOffset
,
maxCLOffsetInConsumeQueue
);
"cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}"
,
topic
,
nextQT
.
getKey
(),
minCommitLogOffset
,
maxCLOffsetInConsumeQueue
);
DefaultMessageStore
.
this
.
commitLog
.
removeQueueFromTopicQueueTable
(
nextQT
.
getValue
().
getTopic
(),
nextQT
.
getValue
().
getQueueId
());
nextQT
.
getValue
().
getQueueId
());
nextQT
.
getValue
().
destroy
();
itQT
.
remove
();
...
...
@@ -970,7 +971,7 @@ public class DefaultMessageStore implements MessageStore {
}
public
Map
<
String
,
Long
>
getMessageIds
(
final
String
topic
,
final
int
queueId
,
long
minOffset
,
long
maxOffset
,
SocketAddress
storeHost
)
{
SocketAddress
storeHost
)
{
Map
<
String
,
Long
>
messageIds
=
new
HashMap
<
String
,
Long
>();
if
(
this
.
shutdown
)
{
return
messageIds
;
...
...
@@ -995,7 +996,7 @@ public class DefaultMessageStore implements MessageStore {
long
offsetPy
=
bufferConsumeQueue
.
getByteBuffer
().
getLong
();
final
ByteBuffer
msgIdMemory
=
ByteBuffer
.
allocate
(
MessageDecoder
.
MSG_ID_LENGTH
);
String
msgId
=
MessageDecoder
.
createMessageId
(
msgIdMemory
,
MessageExt
.
socketAddress2ByteBuffer
(
storeHost
),
offsetPy
);
MessageDecoder
.
createMessageId
(
msgIdMemory
,
MessageExt
.
socketAddress2ByteBuffer
(
storeHost
),
offsetPy
);
messageIds
.
put
(
msgId
,
nextOffset
++);
if
(
nextOffset
>
maxOffset
)
{
return
messageIds
;
...
...
@@ -1091,11 +1092,11 @@ public class DefaultMessageStore implements MessageStore {
ConsumeQueue
logic
=
map
.
get
(
queueId
);
if
(
null
==
logic
)
{
ConsumeQueue
newLogic
=
new
ConsumeQueue
(
topic
,
queueId
,
StorePathConfigHelper
.
getStorePathConsumeQueue
(
this
.
messageStoreConfig
.
getStorePathRootDir
()),
this
.
getMessageStoreConfig
().
getMapedFileSizeConsumeQueue
(),
this
);
topic
,
queueId
,
StorePathConfigHelper
.
getStorePathConsumeQueue
(
this
.
messageStoreConfig
.
getStorePathRootDir
()),
this
.
getMessageStoreConfig
().
getMapedFileSizeConsumeQueue
(),
this
);
ConsumeQueue
oldLogic
=
map
.
putIfAbsent
(
queueId
,
newLogic
);
if
(
oldLogic
!=
null
)
{
logic
=
oldLogic
;
...
...
@@ -1195,7 +1196,7 @@ public class DefaultMessageStore implements MessageStore {
String
stack
=
UtilAll
.
jstack
();
final
String
fileName
=
System
.
getProperty
(
"user.home"
)
+
File
.
separator
+
"debug/lock/stack-"
+
DefaultMessageStore
.
this
.
commitLog
.
getBeginTimeInLock
()
+
"-"
+
lockTime
;
+
DefaultMessageStore
.
this
.
commitLog
.
getBeginTimeInLock
()
+
"-"
+
lockTime
;
MixAll
.
string2FileNotSafe
(
stack
,
fileName
);
}
}
...
...
@@ -1256,11 +1257,11 @@ public class DefaultMessageStore implements MessageStore {
continue
;
}
ConsumeQueue
logic
=
new
ConsumeQueue
(
topic
,
queueId
,
StorePathConfigHelper
.
getStorePathConsumeQueue
(
this
.
messageStoreConfig
.
getStorePathRootDir
()),
this
.
getMessageStoreConfig
().
getMapedFileSizeConsumeQueue
(),
this
);
topic
,
queueId
,
StorePathConfigHelper
.
getStorePathConsumeQueue
(
this
.
messageStoreConfig
.
getStorePathRootDir
()),
this
.
getMessageStoreConfig
().
getMapedFileSizeConsumeQueue
(),
this
);
this
.
putConsumeQueue
(
topic
,
queueId
,
logic
);
if
(!
logic
.
load
())
{
return
false
;
...
...
@@ -1276,12 +1277,12 @@ public class DefaultMessageStore implements MessageStore {
}
private
void
recover
(
final
boolean
lastExitOK
)
{
this
.
recoverConsumeQueue
();
long
maxPhyOffsetOfConsumeQueue
=
this
.
recoverConsumeQueue
();
if
(
lastExitOK
)
{
this
.
commitLog
.
recoverNormally
();
this
.
commitLog
.
recoverNormally
(
maxPhyOffsetOfConsumeQueue
);
}
else
{
this
.
commitLog
.
recoverAbnormally
();
this
.
commitLog
.
recoverAbnormally
(
maxPhyOffsetOfConsumeQueue
);
}
this
.
recoverTopicQueueTable
();
...
...
@@ -1306,12 +1307,18 @@ public class DefaultMessageStore implements MessageStore {
}
}
private
void
recoverConsumeQueue
()
{
private
long
recoverConsumeQueue
()
{
long
maxPhysicOffset
=
-
1
;
for
(
ConcurrentMap
<
Integer
,
ConsumeQueue
>
maps
:
this
.
consumeQueueTable
.
values
())
{
for
(
ConsumeQueue
logic
:
maps
.
values
())
{
logic
.
recover
();
if
(
logic
.
getMaxPhysicOffset
()
>
maxPhysicOffset
)
{
maxPhysicOffset
=
logic
.
getMaxPhysicOffset
();
}
}
}
return
maxPhysicOffset
;
}
private
void
recoverTopicQueueTable
()
{
...
...
@@ -1438,10 +1445,10 @@ public class DefaultMessageStore implements MessageStore {
private
final
static
int
MAX_MANUAL_DELETE_FILE_TIMES
=
20
;
private
final
double
diskSpaceWarningLevelRatio
=
Double
.
parseDouble
(
System
.
getProperty
(
"rocketmq.broker.diskSpaceWarningLevelRatio"
,
"0.90"
));
Double
.
parseDouble
(
System
.
getProperty
(
"rocketmq.broker.diskSpaceWarningLevelRatio"
,
"0.90"
));
private
final
double
diskSpaceCleanForciblyRatio
=
Double
.
parseDouble
(
System
.
getProperty
(
"rocketmq.broker.diskSpaceCleanForciblyRatio"
,
"0.85"
));
Double
.
parseDouble
(
System
.
getProperty
(
"rocketmq.broker.diskSpaceCleanForciblyRatio"
,
"0.85"
));
private
long
lastRedeleteTimestamp
=
0
;
private
volatile
int
manualDeleteFileSeveralTimes
=
0
;
...
...
@@ -1481,16 +1488,16 @@ public class DefaultMessageStore implements MessageStore {
boolean
cleanAtOnce
=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
isCleanFileForciblyEnable
()
&&
this
.
cleanImmediately
;
log
.
info
(
"begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}"
,
fileReservedTime
,
timeup
,
spacefull
,
manualDeleteFileSeveralTimes
,
cleanAtOnce
);
fileReservedTime
,
timeup
,
spacefull
,
manualDeleteFileSeveralTimes
,
cleanAtOnce
);
fileReservedTime
*=
60
*
60
*
1000
;
deleteCount
=
DefaultMessageStore
.
this
.
commitLog
.
deleteExpiredFile
(
fileReservedTime
,
deletePhysicFilesInterval
,
destroyMapedFileIntervalForcibly
,
cleanAtOnce
);
destroyMapedFileIntervalForcibly
,
cleanAtOnce
);
if
(
deleteCount
>
0
)
{
}
else
if
(
spacefull
)
{
log
.
warn
(
"disk space will be full soon, but delete file failed."
);
...
...
@@ -1504,7 +1511,7 @@ public class DefaultMessageStore implements MessageStore {
if
((
currentTimestamp
-
this
.
lastRedeleteTimestamp
)
>
interval
)
{
this
.
lastRedeleteTimestamp
=
currentTimestamp
;
int
destroyMapedFileIntervalForcibly
=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getDestroyMapedFileIntervalForcibly
();
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getDestroyMapedFileIntervalForcibly
();
if
(
DefaultMessageStore
.
this
.
commitLog
.
retryDeleteFirstFile
(
destroyMapedFileIntervalForcibly
))
{
}
}
...
...
@@ -1556,7 +1563,7 @@ public class DefaultMessageStore implements MessageStore {
{
String
storePathLogics
=
StorePathConfigHelper
.
getStorePathConsumeQueue
(
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getStorePathRootDir
());
.
getStorePathConsumeQueue
(
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getStorePathRootDir
());
double
logicsRatio
=
UtilAll
.
getDiskPartitionSpaceUsedPercent
(
storePathLogics
);
if
(
logicsRatio
>
diskSpaceWarningLevelRatio
)
{
boolean
diskok
=
DefaultMessageStore
.
this
.
runningFlags
.
getAndMakeDiskFull
();
...
...
@@ -1726,7 +1733,7 @@ public class DefaultMessageStore implements MessageStore {
if
(
this
.
isCommitLogAvailable
())
{
log
.
warn
(
"shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}"
,
DefaultMessageStore
.
this
.
commitLog
.
getMaxOffset
(),
this
.
reputFromOffset
);
DefaultMessageStore
.
this
.
commitLog
.
getMaxOffset
(),
this
.
reputFromOffset
);
}
super
.
shutdown
();
...
...
@@ -1744,7 +1751,7 @@ public class DefaultMessageStore implements MessageStore {
for
(
boolean
doNext
=
true
;
this
.
isCommitLogAvailable
()
&&
doNext
;
)
{
if
(
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
isDuplicationEnable
()
&&
this
.
reputFromOffset
>=
DefaultMessageStore
.
this
.
getConfirmOffset
())
{
&&
this
.
reputFromOffset
>=
DefaultMessageStore
.
this
.
getConfirmOffset
())
{
break
;
}
...
...
@@ -1755,7 +1762,7 @@ public class DefaultMessageStore implements MessageStore {
for
(
int
readSize
=
0
;
readSize
<
result
.
getSize
()
&&
doNext
;
)
{
DispatchRequest
dispatchRequest
=
DefaultMessageStore
.
this
.
commitLog
.
checkMessageAndReturnSize
(
result
.
getByteBuffer
(),
false
,
false
);
DefaultMessageStore
.
this
.
commitLog
.
checkMessageAndReturnSize
(
result
.
getByteBuffer
(),
false
,
false
);
int
size
=
dispatchRequest
.
getMsgSize
();
if
(
dispatchRequest
.
isSuccess
())
{
...
...
@@ -1763,21 +1770,21 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore
.
this
.
doDispatch
(
dispatchRequest
);
if
(
BrokerRole
.
SLAVE
!=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getBrokerRole
()
&&
DefaultMessageStore
.
this
.
brokerConfig
.
isLongPollingEnable
())
{
&&
DefaultMessageStore
.
this
.
brokerConfig
.
isLongPollingEnable
())
{
DefaultMessageStore
.
this
.
messageArrivingListener
.
arriving
(
dispatchRequest
.
getTopic
(),
dispatchRequest
.
getQueueId
(),
dispatchRequest
.
getConsumeQueueOffset
()
+
1
,
dispatchRequest
.
getTagsCode
(),
dispatchRequest
.
getStoreTimestamp
(),
dispatchRequest
.
getBitMap
(),
dispatchRequest
.
getPropertiesMap
());
dispatchRequest
.
getQueueId
(),
dispatchRequest
.
getConsumeQueueOffset
()
+
1
,
dispatchRequest
.
getTagsCode
(),
dispatchRequest
.
getStoreTimestamp
(),
dispatchRequest
.
getBitMap
(),
dispatchRequest
.
getPropertiesMap
());
}
this
.
reputFromOffset
+=
size
;
readSize
+=
size
;
if
(
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getBrokerRole
()
==
BrokerRole
.
SLAVE
)
{
DefaultMessageStore
.
this
.
storeStatsService
.
getSinglePutMessageTopicTimesTotal
(
dispatchRequest
.
getTopic
()).
incrementAndGet
();
.
getSinglePutMessageTopicTimesTotal
(
dispatchRequest
.
getTopic
()).
incrementAndGet
();
DefaultMessageStore
.
this
.
storeStatsService
.
getSinglePutMessageTopicSizeTotal
(
dispatchRequest
.
getTopic
())
.
addAndGet
(
dispatchRequest
.
getMsgSize
());
.
getSinglePutMessageTopicSizeTotal
(
dispatchRequest
.
getTopic
())
.
addAndGet
(
dispatchRequest
.
getMsgSize
());
}
}
else
if
(
size
==
0
)
{
this
.
reputFromOffset
=
DefaultMessageStore
.
this
.
commitLog
.
rollNextFile
(
this
.
reputFromOffset
);
...
...
@@ -1792,7 +1799,7 @@ public class DefaultMessageStore implements MessageStore {
doNext
=
false
;
if
(
DefaultMessageStore
.
this
.
brokerConfig
.
getBrokerId
()
==
MixAll
.
MASTER_ID
)
{
log
.
error
(
"[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}"
,
this
.
reputFromOffset
);
this
.
reputFromOffset
);
this
.
reputFromOffset
+=
result
.
getSize
()
-
readSize
;
}
...
...
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
浏览文件 @
da73a1d1
...
...
@@ -18,9 +18,12 @@
package
org.apache.rocketmq.store
;
import
java.io.File
;
import
java.io.RandomAccessFile
;
import
java.net.InetAddress
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.nio.MappedByteBuffer
;
import
java.nio.channels.FileChannel
;
import
java.nio.channels.OverlappingFileLockException
;
import
java.util.Map
;
import
java.util.concurrent.atomic.AtomicInteger
;
...
...
@@ -29,6 +32,7 @@ import org.apache.rocketmq.common.BrokerConfig;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.store.config.FlushDiskType
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.config.StorePathConfigHelper
;
import
org.junit.After
;
import
org.apache.rocketmq.store.stats.BrokerStatsManager
;
import
org.junit.Before
;
...
...
@@ -171,6 +175,120 @@ public class DefaultMessageStoreTest {
assertThat
(
getMessageResult45
.
getMessageBufferList
().
size
()).
isEqualTo
(
10
);
}
@Test
public
void
testRecover
()
throws
Exception
{
String
topic
=
"recoverTopic"
;
MessageBody
=
StoreMessage
.
getBytes
();
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
MessageExtBrokerInner
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
Thread
.
sleep
(
100
);
//wait for build consumer queue
long
maxPhyOffset
=
messageStore
.
getMaxPhyOffset
();
long
maxCqOffset
=
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
);
//1.just reboot
messageStore
.
shutdown
();
messageStore
=
buildMessageStore
();
boolean
load
=
messageStore
.
load
();
assertTrue
(
load
);
messageStore
.
start
();
assertTrue
(
maxPhyOffset
==
messageStore
.
getMaxPhyOffset
());
assertTrue
(
maxCqOffset
==
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
));
//2.damage commitlog and reboot normal
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
MessageExtBrokerInner
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
Thread
.
sleep
(
100
);
long
secondLastPhyOffset
=
messageStore
.
getMaxPhyOffset
();
long
secondLastCqOffset
=
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
);
MessageExtBrokerInner
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
messageStore
.
shutdown
();
//damage last message
damageCommitlog
(
secondLastPhyOffset
);
//reboot
messageStore
=
buildMessageStore
();
load
=
messageStore
.
load
();
assertTrue
(
load
);
messageStore
.
start
();
assertTrue
(
secondLastPhyOffset
==
messageStore
.
getMaxPhyOffset
());
assertTrue
(
secondLastCqOffset
==
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
));
//3.damage commitlog and reboot abnormal
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
Thread
.
sleep
(
100
);
secondLastPhyOffset
=
messageStore
.
getMaxPhyOffset
();
secondLastCqOffset
=
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
);
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
messageStore
.
shutdown
();
//damage last message
damageCommitlog
(
secondLastPhyOffset
);
//add abort file
String
fileName
=
StorePathConfigHelper
.
getAbortFile
(((
DefaultMessageStore
)
messageStore
).
getMessageStoreConfig
().
getStorePathRootDir
());
File
file
=
new
File
(
fileName
);
MappedFile
.
ensureDirOK
(
file
.
getParent
());
file
.
createNewFile
();
messageStore
=
buildMessageStore
();
load
=
messageStore
.
load
();
assertTrue
(
load
);
messageStore
.
start
();
assertTrue
(
secondLastPhyOffset
==
messageStore
.
getMaxPhyOffset
());
assertTrue
(
secondLastCqOffset
==
messageStore
.
getMaxOffsetInQueue
(
topic
,
0
));
//message write again
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
messageExtBrokerInner
=
buildMessage
();
messageExtBrokerInner
.
setTopic
(
topic
);
messageExtBrokerInner
.
setQueueId
(
0
);
messageStore
.
putMessage
(
messageExtBrokerInner
);
}
}
private
void
damageCommitlog
(
long
offset
)
throws
Exception
{
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
File
file
=
new
File
(
messageStoreConfig
.
getStorePathCommitLog
()
+
File
.
separator
+
"00000000000000000000"
);
FileChannel
fileChannel
=
new
RandomAccessFile
(
file
,
"rw"
).
getChannel
();
MappedByteBuffer
mappedByteBuffer
=
fileChannel
.
map
(
FileChannel
.
MapMode
.
READ_WRITE
,
0
,
1024
*
1024
*
10
);
int
bodyLen
=
mappedByteBuffer
.
getInt
((
int
)
offset
+
84
);
int
topicLenIndex
=
(
int
)
offset
+
84
+
bodyLen
+
4
;
mappedByteBuffer
.
position
(
topicLenIndex
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
putInt
(
0
);
mappedByteBuffer
.
force
();
fileChannel
.
force
(
true
);
fileChannel
.
close
();
}
private
class
MyMessageArrivingListener
implements
MessageArrivingListener
{
@Override
public
void
arriving
(
String
topic
,
int
queueId
,
long
logicOffset
,
long
tagsCode
,
long
msgStoreTime
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录