Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
9b0d037d
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
9b0d037d
编写于
7月 24, 2019
作者:
V
vongosling
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' of github.com:apache/rocketmq into develop
上级
664e0673
be7c6dd2
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
52 addition
and
51 deletion
+52
-51
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
...rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
+4
-3
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
...c/main/java/org/apache/rocketmq/common/ServiceThread.java
+2
-2
common/src/main/java/org/apache/rocketmq/common/UtilAll.java
common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+5
-5
common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
...ava/org/apache/rocketmq/common/stats/MomentStatsItem.java
+1
-1
common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
.../org/apache/rocketmq/common/stats/MomentStatsItemSet.java
+1
-1
common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
...main/java/org/apache/rocketmq/common/stats/StatsItem.java
+3
-3
common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
...n/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+3
-3
common/src/test/java/org/apache/rocketmq/common/protocol/QueryConsumeTimeSpanBodyTest.java
...ocketmq/common/protocol/QueryConsumeTimeSpanBodyTest.java
+2
-2
remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
...va/org/apache/rocketmq/remoting/common/ServiceThread.java
+2
-2
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
.../org/apache/rocketmq/store/AllocateMappedFileService.java
+3
-3
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+8
-8
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+10
-10
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
...e/src/main/java/org/apache/rocketmq/store/MappedFile.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
...a/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+5
-5
store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
.../main/java/org/apache/rocketmq/store/index/IndexFile.java
+1
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
9b0d037d
...
...
@@ -319,7 +319,7 @@ public class BrokerController {
this
.
registerProcessor
();
final
long
initialDelay
=
UtilAll
.
computNextMorningTimeMillis
()
-
System
.
currentTimeMillis
();
final
long
initialDelay
=
UtilAll
.
comput
e
NextMorningTimeMillis
()
-
System
.
currentTimeMillis
();
final
long
period
=
1000
*
60
*
60
*
24
;
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
...
...
broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
浏览文件 @
9b0d037d
...
...
@@ -18,6 +18,7 @@
package
org.apache.rocketmq.broker.filter
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
...
...
@@ -98,10 +99,10 @@ public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher {
request
.
setBitMap
(
filterBitMap
.
bytes
());
long
e
clipseTime
=
System
.
currentTimeMillis
()
-
startTime
;
long
e
lapsedTime
=
UtilAll
.
computeElapsedTimeMilliseconds
(
startTime
)
;
// 1ms
if
(
e
clipse
Time
>=
1
)
{
log
.
warn
(
"Spend {} ms to calc bit map, consumerNum={}, topic={}"
,
e
clipse
Time
,
filterDatas
.
size
(),
request
.
getTopic
());
if
(
e
lapsed
Time
>=
1
)
{
log
.
warn
(
"Spend {} ms to calc bit map, consumerNum={}, topic={}"
,
e
lapsed
Time
,
filterDatas
.
size
(),
request
.
getTopic
());
}
}
catch
(
Throwable
e
)
{
log
.
error
(
"Calc bit map error! topic={}, offset={}, queueId={}, {}"
,
request
.
getTopic
(),
request
.
getCommitLogOffset
(),
request
.
getQueueId
(),
e
);
...
...
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
浏览文件 @
9b0d037d
...
...
@@ -78,8 +78,8 @@ public abstract class ServiceThread implements Runnable {
if
(!
this
.
thread
.
isDaemon
())
{
this
.
thread
.
join
(
this
.
getJointime
());
}
long
e
clipse
Time
=
System
.
currentTimeMillis
()
-
beginTime
;
log
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" e
clipse time(ms) "
+
eclipse
Time
+
" "
long
e
lapsed
Time
=
System
.
currentTimeMillis
()
-
beginTime
;
log
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" e
lapsed time(ms) "
+
elapsed
Time
+
" "
+
this
.
getJointime
());
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"Interrupted"
,
e
);
...
...
common/src/main/java/org/apache/rocketmq/common/UtilAll.java
浏览文件 @
9b0d037d
...
...
@@ -93,7 +93,7 @@ public class UtilAll {
return
nf
.
format
(
offset
);
}
public
static
long
computeE
clipse
TimeMilliseconds
(
final
long
beginTime
)
{
public
static
long
computeE
lapsed
TimeMilliseconds
(
final
long
beginTime
)
{
return
System
.
currentTimeMillis
()
-
beginTime
;
}
...
...
@@ -124,7 +124,7 @@ public class UtilAll {
cal
.
get
(
Calendar
.
MILLISECOND
));
}
public
static
long
computNextMorningTimeMillis
()
{
public
static
long
comput
e
NextMorningTimeMillis
()
{
Calendar
cal
=
Calendar
.
getInstance
();
cal
.
setTimeInMillis
(
System
.
currentTimeMillis
());
cal
.
add
(
Calendar
.
DAY_OF_MONTH
,
1
);
...
...
@@ -136,7 +136,7 @@ public class UtilAll {
return
cal
.
getTimeInMillis
();
}
public
static
long
computNextMinutesTimeMillis
()
{
public
static
long
comput
e
NextMinutesTimeMillis
()
{
Calendar
cal
=
Calendar
.
getInstance
();
cal
.
setTimeInMillis
(
System
.
currentTimeMillis
());
cal
.
add
(
Calendar
.
DAY_OF_MONTH
,
0
);
...
...
@@ -148,7 +148,7 @@ public class UtilAll {
return
cal
.
getTimeInMillis
();
}
public
static
long
computNextHourTimeMillis
()
{
public
static
long
comput
e
NextHourTimeMillis
()
{
Calendar
cal
=
Calendar
.
getInstance
();
cal
.
setTimeInMillis
(
System
.
currentTimeMillis
());
cal
.
add
(
Calendar
.
DAY_OF_MONTH
,
0
);
...
...
@@ -160,7 +160,7 @@ public class UtilAll {
return
cal
.
getTimeInMillis
();
}
public
static
long
computNextHalfHourTimeMillis
()
{
public
static
long
comput
e
NextHalfHourTimeMillis
()
{
Calendar
cal
=
Calendar
.
getInstance
();
cal
.
setTimeInMillis
(
System
.
currentTimeMillis
());
cal
.
add
(
Calendar
.
DAY_OF_MONTH
,
0
);
...
...
common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
浏览文件 @
9b0d037d
...
...
@@ -51,7 +51,7 @@ public class MomentStatsItem {
}
catch
(
Throwable
e
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
5
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
5
,
TimeUnit
.
MILLISECONDS
);
}
public
void
printAtMinutes
()
{
...
...
common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
浏览文件 @
9b0d037d
...
...
@@ -58,7 +58,7 @@ public class MomentStatsItemSet {
}
catch
(
Throwable
ignored
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
5
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
5
,
TimeUnit
.
MILLISECONDS
);
}
private
void
printAtMinutes
()
{
...
...
common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
浏览文件 @
9b0d037d
...
...
@@ -127,7 +127,7 @@ public class StatsItem {
}
catch
(
Throwable
ignored
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
,
TimeUnit
.
MILLISECONDS
);
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
...
...
@@ -137,7 +137,7 @@ public class StatsItem {
}
catch
(
Throwable
ignored
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextHourTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
60
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextHourTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
60
,
TimeUnit
.
MILLISECONDS
);
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
...
...
@@ -147,7 +147,7 @@ public class StatsItem {
}
catch
(
Throwable
ignored
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextMorningTimeMillis
()
-
System
.
currentTimeMillis
())
-
2000
,
1000
*
60
*
60
*
24
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextMorningTimeMillis
()
-
System
.
currentTimeMillis
())
-
2000
,
1000
*
60
*
60
*
24
,
TimeUnit
.
MILLISECONDS
);
}
public
void
samplingInSeconds
()
{
...
...
common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
浏览文件 @
9b0d037d
...
...
@@ -81,7 +81,7 @@ public class StatsItemSet {
}
catch
(
Throwable
ignored
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextMinutesTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
,
TimeUnit
.
MILLISECONDS
);
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
...
...
@@ -91,7 +91,7 @@ public class StatsItemSet {
}
catch
(
Throwable
ignored
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextHourTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
60
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextHourTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
60
,
TimeUnit
.
MILLISECONDS
);
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
...
...
@@ -101,7 +101,7 @@ public class StatsItemSet {
}
catch
(
Throwable
ignored
)
{
}
}
},
Math
.
abs
(
UtilAll
.
computNextMorningTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
60
*
24
,
TimeUnit
.
MILLISECONDS
);
},
Math
.
abs
(
UtilAll
.
comput
e
NextMorningTimeMillis
()
-
System
.
currentTimeMillis
()),
1000
*
60
*
60
*
24
,
TimeUnit
.
MILLISECONDS
);
}
private
void
samplingInSeconds
()
{
...
...
common/src/test/java/org/apache/rocketmq/common/protocol/QueryConsumeTimeSpanBodyTest.java
浏览文件 @
9b0d037d
...
...
@@ -103,8 +103,8 @@ public class QueryConsumeTimeSpanBodyTest {
List
<
QueueTimeSpan
>
queueTimeSpans
=
new
ArrayList
<
QueueTimeSpan
>();
QueueTimeSpan
queueTimeSpan
=
new
QueueTimeSpan
();
queueTimeSpan
.
setMinTimeStamp
(
System
.
currentTimeMillis
());
queueTimeSpan
.
setMaxTimeStamp
(
UtilAll
.
computNextHourTimeMillis
());
queueTimeSpan
.
setConsumeTimeStamp
(
UtilAll
.
computNextMinutesTimeMillis
());
queueTimeSpan
.
setMaxTimeStamp
(
UtilAll
.
comput
e
NextHourTimeMillis
());
queueTimeSpan
.
setConsumeTimeStamp
(
UtilAll
.
comput
e
NextMinutesTimeMillis
());
queueTimeSpan
.
setDelayTime
(
5000
l
);
MessageQueue
messageQueue
=
new
MessageQueue
(
UUID
.
randomUUID
().
toString
(),
UUID
.
randomUUID
().
toString
(),
new
Random
().
nextInt
());
queueTimeSpan
.
setMessageQueue
(
messageQueue
);
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
浏览文件 @
9b0d037d
...
...
@@ -62,8 +62,8 @@ public abstract class ServiceThread implements Runnable {
long
beginTime
=
System
.
currentTimeMillis
();
this
.
thread
.
join
(
this
.
getJointime
());
long
e
clipse
Time
=
System
.
currentTimeMillis
()
-
beginTime
;
log
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" e
clipse time(ms) "
+
eclipse
Time
+
" "
long
e
lapsed
Time
=
System
.
currentTimeMillis
()
-
beginTime
;
log
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" e
lapsed time(ms) "
+
elapsed
Time
+
" "
+
this
.
getJointime
());
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"Interrupted"
,
e
);
...
...
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
浏览文件 @
9b0d037d
...
...
@@ -176,10 +176,10 @@ public class AllocateMappedFileService extends ServiceThread {
mappedFile
=
new
MappedFile
(
req
.
getFilePath
(),
req
.
getFileSize
());
}
long
e
clipseTime
=
UtilAll
.
computeEclipse
TimeMilliseconds
(
beginTime
);
if
(
e
clipse
Time
>
10
)
{
long
e
lapsedTime
=
UtilAll
.
computeElapsed
TimeMilliseconds
(
beginTime
);
if
(
e
lapsed
Time
>
10
)
{
int
queueSize
=
this
.
requestQueue
.
size
();
log
.
warn
(
"create mappedFile spent time(ms) "
+
e
clipse
Time
+
" queue size "
+
queueSize
log
.
warn
(
"create mappedFile spent time(ms) "
+
e
lapsed
Time
+
" queue size "
+
queueSize
+
" "
+
req
.
getFilePath
()
+
" "
+
req
.
getFileSize
());
}
...
...
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
9b0d037d
...
...
@@ -569,7 +569,7 @@ public class CommitLog {
}
}
long
e
clipse
TimeInLock
=
0
;
long
e
lapsed
TimeInLock
=
0
;
MappedFile
unlockMappedFile
=
null
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
...
...
@@ -619,14 +619,14 @@ public class CommitLog {
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
}
e
clipse
TimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
e
lapsed
TimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
beginTimeInLock
=
0
;
}
finally
{
putMessageLock
.
unlock
();
}
if
(
e
clipse
TimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
e
clipse
TimeInLock
,
msg
.
getBody
().
length
,
result
);
if
(
e
lapsed
TimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
e
lapsed
TimeInLock
,
msg
.
getBody
().
length
,
result
);
}
if
(
null
!=
unlockMappedFile
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isWarmMapedFileEnable
())
{
...
...
@@ -714,7 +714,7 @@ public class CommitLog {
return
new
PutMessageResult
(
PutMessageStatus
.
MESSAGE_ILLEGAL
,
null
);
}
long
e
clipse
TimeInLock
=
0
;
long
e
lapsed
TimeInLock
=
0
;
MappedFile
unlockMappedFile
=
null
;
MappedFile
mappedFile
=
this
.
mappedFileQueue
.
getLastMappedFile
();
...
...
@@ -769,14 +769,14 @@ public class CommitLog {
return
new
PutMessageResult
(
PutMessageStatus
.
UNKNOWN_ERROR
,
result
);
}
e
clipse
TimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
e
lapsed
TimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginLockTimestamp
;
beginTimeInLock
=
0
;
}
finally
{
putMessageLock
.
unlock
();
}
if
(
e
clipse
TimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
e
clipse
TimeInLock
,
messageExtBatch
.
getBody
().
length
,
result
);
if
(
e
lapsed
TimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
e
lapsed
TimeInLock
,
messageExtBatch
.
getBody
().
length
,
result
);
}
if
(
null
!=
unlockMappedFile
&&
this
.
defaultMessageStore
.
getMessageStoreConfig
().
isWarmMapedFileEnable
())
{
...
...
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
9b0d037d
...
...
@@ -394,11 +394,11 @@ public class DefaultMessageStore implements MessageStore {
long
beginTime
=
this
.
getSystemClock
().
now
();
PutMessageResult
result
=
this
.
commitLog
.
putMessage
(
msg
);
long
e
clipse
Time
=
this
.
getSystemClock
().
now
()
-
beginTime
;
if
(
e
clipse
Time
>
500
)
{
log
.
warn
(
"putMessage not in lock e
clipse time(ms)={}, bodyLength={}"
,
eclipse
Time
,
msg
.
getBody
().
length
);
long
e
lapsed
Time
=
this
.
getSystemClock
().
now
()
-
beginTime
;
if
(
e
lapsed
Time
>
500
)
{
log
.
warn
(
"putMessage not in lock e
lapsed time(ms)={}, bodyLength={}"
,
elapsed
Time
,
msg
.
getBody
().
length
);
}
this
.
storeStatsService
.
setPutMessageEntireTimeMax
(
e
clipse
Time
);
this
.
storeStatsService
.
setPutMessageEntireTimeMax
(
e
lapsed
Time
);
if
(
null
==
result
||
!
result
.
isOk
())
{
this
.
storeStatsService
.
getPutMessageFailedTimes
().
incrementAndGet
();
...
...
@@ -450,11 +450,11 @@ public class DefaultMessageStore implements MessageStore {
long
beginTime
=
this
.
getSystemClock
().
now
();
PutMessageResult
result
=
this
.
commitLog
.
putMessages
(
messageExtBatch
);
long
e
clipse
Time
=
this
.
getSystemClock
().
now
()
-
beginTime
;
if
(
e
clipse
Time
>
500
)
{
log
.
warn
(
"not in lock e
clipse time(ms)={}, bodyLength={}"
,
eclipse
Time
,
messageExtBatch
.
getBody
().
length
);
long
e
lapsed
Time
=
this
.
getSystemClock
().
now
()
-
beginTime
;
if
(
e
lapsed
Time
>
500
)
{
log
.
warn
(
"not in lock e
lapsed time(ms)={}, bodyLength={}"
,
elapsed
Time
,
messageExtBatch
.
getBody
().
length
);
}
this
.
storeStatsService
.
setPutMessageEntireTimeMax
(
e
clipse
Time
);
this
.
storeStatsService
.
setPutMessageEntireTimeMax
(
e
lapsed
Time
);
if
(
null
==
result
||
!
result
.
isOk
())
{
this
.
storeStatsService
.
getPutMessageFailedTimes
().
incrementAndGet
();
...
...
@@ -642,8 +642,8 @@ public class DefaultMessageStore implements MessageStore {
}
else
{
this
.
storeStatsService
.
getGetMessageTimesTotalMiss
().
incrementAndGet
();
}
long
e
clipse
Time
=
this
.
getSystemClock
().
now
()
-
beginTime
;
this
.
storeStatsService
.
setGetMessageEntireTimeMax
(
e
clipse
Time
);
long
e
lapsed
Time
=
this
.
getSystemClock
().
now
()
-
beginTime
;
this
.
storeStatsService
.
setGetMessageEntireTimeMax
(
e
lapsed
Time
);
getResult
.
setStatus
(
status
);
getResult
.
setNextBeginOffset
(
nextBeginOffset
);
...
...
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
浏览文件 @
9b0d037d
...
...
@@ -448,7 +448,7 @@ public class MappedFile extends ReferenceResource {
log
.
info
(
"delete file[REF:"
+
this
.
getRefCount
()
+
"] "
+
this
.
fileName
+
(
result
?
" OK, "
:
" Failed, "
)
+
"W:"
+
this
.
getWrotePosition
()
+
" M:"
+
this
.
getFlushedPosition
()
+
", "
+
UtilAll
.
computeE
clipse
TimeMilliseconds
(
beginTime
));
+
UtilAll
.
computeE
lapsed
TimeMilliseconds
(
beginTime
));
}
catch
(
Exception
e
)
{
log
.
warn
(
"close file channel "
+
this
.
fileName
+
" Failed. "
,
e
);
}
...
...
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
浏览文件 @
9b0d037d
...
...
@@ -411,7 +411,7 @@ public class DLedgerCommitLog extends CommitLog {
EncodeResult
encodeResult
;
putMessageLock
.
lock
();
//spin or ReentrantLock ,depending on store config
long
e
clipse
TimeInLock
;
long
e
lapsed
TimeInLock
;
long
queueOffset
;
try
{
beginTimeInDledgerLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
();
...
...
@@ -431,8 +431,8 @@ public class DLedgerCommitLog extends CommitLog {
long
wroteOffset
=
dledgerFuture
.
getPos
()
+
DLedgerEntry
.
BODY_OFFSET
;
ByteBuffer
buffer
=
ByteBuffer
.
allocate
(
MessageDecoder
.
MSG_ID_LENGTH
);
String
msgId
=
MessageDecoder
.
createMessageId
(
buffer
,
msg
.
getStoreHostBytes
(),
wroteOffset
);
e
clipse
TimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginTimeInDledgerLock
;
appendResult
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
encodeResult
.
data
.
length
,
msgId
,
System
.
currentTimeMillis
(),
queueOffset
,
e
clipse
TimeInLock
);
e
lapsed
TimeInLock
=
this
.
defaultMessageStore
.
getSystemClock
().
now
()
-
beginTimeInDledgerLock
;
appendResult
=
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
,
wroteOffset
,
encodeResult
.
data
.
length
,
msgId
,
System
.
currentTimeMillis
(),
queueOffset
,
e
lapsed
TimeInLock
);
switch
(
tranType
)
{
case
MessageSysFlag
.
TRANSACTION_PREPARED_TYPE
:
case
MessageSysFlag
.
TRANSACTION_ROLLBACK_TYPE
:
...
...
@@ -453,8 +453,8 @@ public class DLedgerCommitLog extends CommitLog {
putMessageLock
.
unlock
();
}
if
(
e
clipse
TimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
e
clipse
TimeInLock
,
msg
.
getBody
().
length
,
appendResult
);
if
(
e
lapsed
TimeInLock
>
500
)
{
log
.
warn
(
"[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}"
,
e
lapsed
TimeInLock
,
msg
.
getBody
().
length
,
appendResult
);
}
PutMessageStatus
putMessageStatus
=
PutMessageStatus
.
UNKNOWN_ERROR
;
...
...
store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
浏览文件 @
9b0d037d
...
...
@@ -77,7 +77,7 @@ public class IndexFile {
this
.
indexHeader
.
updateByteBuffer
();
this
.
mappedByteBuffer
.
force
();
this
.
mappedFile
.
release
();
log
.
info
(
"flush index file e
clipse
time(ms) "
+
(
System
.
currentTimeMillis
()
-
beginTime
));
log
.
info
(
"flush index file e
lapsed
time(ms) "
+
(
System
.
currentTimeMillis
()
-
beginTime
));
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录