Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
af352d51
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
af352d51
编写于
10月 19, 2018
作者:
S
suiyuzeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
code style
上级
da73a1d1
变更
3
展开全部
隐藏空白更改
内联
并排
Showing
3 changed file
with
118 addition
and
118 deletion
+118
-118
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+59
-59
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+58
-58
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
...va/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+1
-1
未找到文件。
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
af352d51
...
...
@@ -66,7 +66,7 @@ public class CommitLog {
public
CommitLog
(
final
DefaultMessageStore
defaultMessageStore
)
{
this
.
mappedFileQueue
=
new
MappedFileQueue
(
defaultMessageStore
.
getMessageStoreConfig
().
getStorePathCommitLog
(),
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
(),
defaultMessageStore
.
getAllocateMappedFileService
());
defaultMessageStore
.
getMessageStoreConfig
().
getMapedFileSizeCommitLog
(),
defaultMessageStore
.
getAllocateMappedFileService
());
this
.
defaultMessageStore
=
defaultMessageStore
;
if
(
FlushDiskType
.
SYNC_FLUSH
==
defaultMessageStore
.
getMessageStoreConfig
().
getFlushDiskType
())
{
...
...
@@ -129,10 +129,10 @@ public class CommitLog {
}
public
int
deleteExpiredFile
(
final
long
expiredTime
,
final
int
deleteFilesInterval
,
final
long
intervalForcibly
,
final
boolean
cleanImmediately
final
long
expiredTime
,
final
int
deleteFilesInterval
,
final
long
intervalForcibly
,
final
boolean
cleanImmediately
)
{
return
this
.
mappedFileQueue
.
deleteExpiredFileByTime
(
expiredTime
,
deleteFilesInterval
,
intervalForcibly
,
cleanImmediately
);
}
...
...
@@ -234,7 +234,7 @@ public class CommitLog {
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
public
DispatchRequest
checkMessageAndReturnSize
(
java
.
nio
.
ByteBuffer
byteBuffer
,
final
boolean
checkCRC
,
final
boolean
readBody
)
{
final
boolean
readBody
)
{
try
{
// 1 TOTAL SIZE
int
totalSize
=
byteBuffer
.
getInt
();
...
...
@@ -330,7 +330,7 @@ public class CommitLog {
if
(
delayLevel
>
0
)
{
tagsCode
=
this
.
defaultMessageStore
.
getScheduleMessageService
().
computeDeliverTimestamp
(
delayLevel
,
storeTimestamp
);
storeTimestamp
);
}
}
}
...
...
@@ -344,24 +344,24 @@ public class CommitLog {
doNothingForDeadCode
(
byteBuffer1
);
doNothingForDeadCode
(
byteBuffer2
);
log
.
error
(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}"
,
totalSize
,
readLength
,
bodyLen
,
topicLen
,
propertiesLength
);
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}"
,
totalSize
,
readLength
,
bodyLen
,
topicLen
,
propertiesLength
);
return
new
DispatchRequest
(
totalSize
,
false
/* success */
);
}
return
new
DispatchRequest
(
topic
,
queueId
,
physicOffset
,
totalSize
,
tagsCode
,
storeTimestamp
,
queueOffset
,
keys
,
uniqKey
,
sysFlag
,
preparedTransactionOffset
,
propertiesMap
topic
,
queueId
,
physicOffset
,
totalSize
,
tagsCode
,
storeTimestamp
,
queueOffset
,
keys
,
uniqKey
,
sysFlag
,
preparedTransactionOffset
,
propertiesMap
);
}
catch
(
Exception
e
)
{
}
...
...
@@ -371,23 +371,23 @@ public class CommitLog {
private
static
int
calMsgLength
(
int
bodyLength
,
int
topicLength
,
int
propertiesLength
)
{
final
int
msgLen
=
4
//TOTALSIZE
+
4
//MAGICCODE
+
4
//BODYCRC
+
4
//QUEUEID
+
4
//FLAG
+
8
//QUEUEOFFSET
+
8
//PHYSICALOFFSET
+
4
//SYSFLAG
+
8
//BORNTIMESTAMP
+
8
//BORNHOST
+
8
//STORETIMESTAMP
+
8
//STOREHOSTADDRESS
+
4
//RECONSUMETIMES
+
8
//Prepared Transaction Offset
+
4
+
(
bodyLength
>
0
?
bodyLength
:
0
)
//BODY
+
1
+
topicLength
//TOPIC
+
2
+
(
propertiesLength
>
0
?
propertiesLength
:
0
)
//propertiesLength
+
0
;
+
4
//MAGICCODE
+
4
//BODYCRC
+
4
//QUEUEID
+
4
//FLAG
+
8
//QUEUEOFFSET
+
8
//PHYSICALOFFSET
+
4
//SYSFLAG
+
8
//BORNTIMESTAMP
+
8
//BORNHOST
+
8
//STORETIMESTAMP
+
8
//STOREHOSTADDRESS
+
4
//RECONSUMETIMES
+
8
//Prepared Transaction Offset
+
4
+
(
bodyLength
>
0
?
bodyLength
:
0
)
//BODY
+
1
+
topicLength
//TOPIC
+
2
+
(
propertiesLength
>
0
?
propertiesLength
:
0
)
//propertiesLength
+
0
;
return
msgLen
;
}
...
...
@@ -497,18 +497,18 @@ public class CommitLog {
}
if
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isMessageIndexEnable
()
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isMessageIndexSafe
())
{
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isMessageIndexSafe
())
{
if
(
storeTimestamp
<=
this
.
defaultMessageStore
.
getStoreCheckpoint
().
getMinTimestampIndex
())
{
log
.
info
(
"find check timestamp, {} {}"
,
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
return
true
;
}
}
else
{
if
(
storeTimestamp
<=
this
.
defaultMessageStore
.
getStoreCheckpoint
().
getMinTimestamp
())
{
log
.
info
(
"find check timestamp, {} {}"
,
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
storeTimestamp
,
UtilAll
.
timeMillisToHumanString
(
storeTimestamp
));
return
true
;
}
}
...
...
@@ -544,7 +544,7 @@ public class CommitLog {
final
int
tranType
=
MessageSysFlag
.
getTransactionValue
(
msg
.
getSysFlag
());
if
(
tranType
==
MessageSysFlag
.
TRANSACTION_NOT_TYPE
||
tranType
==
MessageSysFlag
.
TRANSACTION_COMMIT_TYPE
)
{
||
tranType
==
MessageSysFlag
.
TRANSACTION_COMMIT_TYPE
)
{
// Delay Delivery
if
(
msg
.
getDelayTimeLevel
()
>
0
)
{
if
(
msg
.
getDelayTimeLevel
()
>
this
.
defaultMessageStore
.
getScheduleMessageService
().
getMaxDelayLevel
())
{
...
...
@@ -650,7 +650,7 @@ public class CommitLog {
boolean
flushOK
=
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
if
(!
flushOK
)
{
log
.
error
(
"do groupcommit, wait for flush failed, topic: "
+
messageExt
.
getTopic
()
+
" tags: "
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostString
());
+
" client address: "
+
messageExt
.
getBornHostString
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_DISK_TIMEOUT
);
}
}
else
{
...
...
@@ -677,10 +677,10 @@ public class CommitLog {
service
.
putRequest
(
request
);
service
.
getWaitNotifyObject
().
wakeupAll
();
boolean
flushOK
=
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
request
.
waitForFlush
(
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getSyncFlushTimeout
());
if
(!
flushOK
)
{
log
.
error
(
"do sync transfer other node, wait return, but failed, topic: "
+
messageExt
.
getTopic
()
+
" tags: "
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostNameString
());
+
messageExt
.
getTags
()
+
" client address: "
+
messageExt
.
getBornHostNameString
());
putMessageResult
.
setPutMessageStatus
(
PutMessageStatus
.
FLUSH_SLAVE_TIMEOUT
);
}
}
...
...
@@ -917,7 +917,7 @@ public class CommitLog {
int
commitDataLeastPages
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getCommitCommitLogLeastPages
();
int
commitDataThoroughInterval
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getCommitCommitLogThoroughInterval
();
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getCommitCommitLogThoroughInterval
();
long
begin
=
System
.
currentTimeMillis
();
if
(
begin
>=
(
this
.
lastCommitTimestamp
+
commitDataThoroughInterval
))
{
...
...
@@ -966,7 +966,7 @@ public class CommitLog {
int
flushPhysicQueueLeastPages
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushCommitLogLeastPages
();
int
flushPhysicQueueThoroughInterval
=
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushCommitLogThoroughInterval
();
CommitLog
.
this
.
defaultMessageStore
.
getMessageStoreConfig
().
getFlushCommitLogThoroughInterval
();
boolean
printFlushProgress
=
false
;
...
...
@@ -1187,7 +1187,7 @@ public class CommitLog {
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBrokerInner
msgInner
)
{
final
MessageExtBrokerInner
msgInner
)
{
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
...
...
@@ -1227,7 +1227,7 @@ public class CommitLog {
* Serialize message
*/
final
byte
[]
propertiesData
=
msgInner
.
getPropertiesString
()
==
null
?
null
:
msgInner
.
getPropertiesString
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
msgInner
.
getPropertiesString
()
==
null
?
null
:
msgInner
.
getPropertiesString
().
getBytes
(
MessageDecoder
.
CHARSET_UTF8
);
final
int
propertiesLength
=
propertiesData
==
null
?
0
:
propertiesData
.
length
;
...
...
@@ -1246,7 +1246,7 @@ public class CommitLog {
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLength
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
MESSAGE_SIZE_EXCEEDED
);
}
...
...
@@ -1262,7 +1262,7 @@ public class CommitLog {
final
long
beginTimeMills
=
CommitLog
.
this
.
defaultMessageStore
.
now
();
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
maxBlank
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgId
,
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
}
// Initialization of storage space
...
...
@@ -1315,7 +1315,7 @@ public class CommitLog {
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
msgLen
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
msgLen
,
msgId
,
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
msgInner
.
getStoreTimestamp
(),
queueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
switch
(
tranType
)
{
case
MessageSysFlag
.
TRANSACTION_PREPARED_TYPE
:
...
...
@@ -1333,7 +1333,7 @@ public class CommitLog {
}
public
AppendMessageResult
doAppend
(
final
long
fileFromOffset
,
final
ByteBuffer
byteBuffer
,
final
int
maxBlank
,
final
MessageExtBatch
messageExtBatch
)
{
final
MessageExtBatch
messageExtBatch
)
{
byteBuffer
.
mark
();
//physical offset
long
wroteOffset
=
fileFromOffset
+
byteBuffer
.
position
();
...
...
@@ -1365,7 +1365,7 @@ public class CommitLog {
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLen
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
MESSAGE_SIZE_EXCEEDED
);
}
totalMsgLen
+=
msgLen
;
...
...
@@ -1383,7 +1383,7 @@ public class CommitLog {
byteBuffer
.
reset
();
//ignore the previous appended messages
byteBuffer
.
put
(
this
.
msgStoreItemMemory
.
array
(),
0
,
8
);
return
new
AppendMessageResult
(
AppendMessageStatus
.
END_OF_FILE
,
wroteOffset
,
maxBlank
,
msgIdBuilder
.
toString
(),
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
}
//move to add queue offset and commitlog offset
messagesByteBuff
.
position
(
msgPos
+
20
);
...
...
@@ -1407,7 +1407,7 @@ public class CommitLog {
byteBuffer
.
put
(
messagesByteBuff
);
messageExtBatch
.
setEncodedBuff
(
null
);
AppendMessageResult
result
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
totalMsgLen
,
msgIdBuilder
.
toString
(),
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
messageExtBatch
.
getStoreTimestamp
(),
beginQueueOffset
,
CommitLog
.
this
.
defaultMessageStore
.
now
()
-
beginTimeMills
);
result
.
setMsgNum
(
msgNum
);
CommitLog
.
this
.
topicQueueTable
.
put
(
key
,
queueOffset
);
...
...
@@ -1466,7 +1466,7 @@ public class CommitLog {
// Exceeds the maximum message
if
(
msgLen
>
this
.
maxMessageSize
)
{
CommitLog
.
log
.
warn
(
"message size exceeded, msg total size: "
+
msgLen
+
", msg body size: "
+
bodyLen
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
+
", maxMessageSize: "
+
this
.
maxMessageSize
);
throw
new
RuntimeException
(
"message size exceeded"
);
}
...
...
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
af352d51
此差异已折叠。
点击以展开。
store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
浏览文件 @
af352d51
...
...
@@ -292,7 +292,7 @@ public class DefaultMessageStoreTest {
private
class
MyMessageArrivingListener
implements
MessageArrivingListener
{
@Override
public
void
arriving
(
String
topic
,
int
queueId
,
long
logicOffset
,
long
tagsCode
,
long
msgStoreTime
,
byte
[]
filterBitMap
,
Map
<
String
,
String
>
properties
)
{
byte
[]
filterBitMap
,
Map
<
String
,
String
>
properties
)
{
}
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录