Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
246be9eb
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
246be9eb
编写于
7月 05, 2017
作者:
S
shroman
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
ROCKETMQ-6: Use logger for exceptions instead of e.printStackTrace().
Signed-off-by:
N
shroman
<
rshtykh@yahoo.com
>
上级
9ad9ad06
变更
28
显示空白变更内容
内联
并排
Showing
28 changed file
with
227 addition
and
188 deletion
+227
-188
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
...pache/rocketmq/broker/processor/PullMessageProcessor.java
+38
-38
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
.../org/apache/rocketmq/broker/topic/TopicConfigManager.java
+21
-19
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
...rc/main/java/org/apache/rocketmq/common/BrokerConfig.java
+18
-12
common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
...c/main/java/org/apache/rocketmq/common/ConfigManager.java
+6
-6
common/src/main/java/org/apache/rocketmq/common/MixAll.java
common/src/main/java/org/apache/rocketmq/common/MixAll.java
+14
-13
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
...c/main/java/org/apache/rocketmq/common/ServiceThread.java
+9
-8
common/src/main/java/org/apache/rocketmq/common/UtilAll.java
common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+13
-3
common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
...ava/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+1
-1
common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
...ava/org/apache/rocketmq/common/namesrv/TopAddressing.java
+1
-0
common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
...a/org/apache/rocketmq/common/protocol/MQProtosHelper.java
+6
-1
filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
...n/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
+4
-4
namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
...cketmq/namesrv/processor/ClusterTestRequestProcessor.java
+1
-1
remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
...a/org/apache/rocketmq/remoting/common/RemotingHelper.java
+5
-1
remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
...ava/org/apache/rocketmq/remoting/common/RemotingUtil.java
+2
-6
remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
...va/org/apache/rocketmq/remoting/common/ServiceThread.java
+9
-8
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
...java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+1
-0
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
...apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+27
-27
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
.../org/apache/rocketmq/store/AllocateMappedFileService.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+33
-29
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
...src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
...e/src/main/java/org/apache/rocketmq/store/MappedFile.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
.../main/java/org/apache/rocketmq/store/StoreCheckpoint.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
...n/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
+7
-2
store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
.../main/java/org/apache/rocketmq/store/index/IndexFile.java
+2
-2
store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
...in/java/org/apache/rocketmq/store/index/IndexService.java
+1
-1
store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
...pache/rocketmq/store/schedule/ScheduleMessageService.java
+2
-1
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
...ain/java/org/apache/rocketmq/store/stats/BrokerStats.java
+1
-0
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
246be9eb
...
...
@@ -210,7 +210,7 @@ public class BrokerController {
this
.
messageStore
.
getDispatcherList
().
addFirst
(
new
CommitLogDispatcherCalcBitMap
(
this
.
brokerConfig
,
this
.
consumerFilterManager
));
}
catch
(
IOException
e
)
{
result
=
false
;
e
.
printStackTrace
(
);
log
.
error
(
"Failed to initialize"
,
e
);
}
}
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
浏览文件 @
246be9eb
...
...
@@ -67,7 +67,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
public
class
PullMessageProcessor
implements
NettyRequestProcessor
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
final
BrokerController
brokerController
;
private
List
<
ConsumeMessageHook
>
consumeMessageHookList
;
...
...
@@ -94,9 +94,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response
.
setOpaque
(
request
.
getOpaque
());
if
(
LOG
.
isDebugEnabled
())
{
LOG
.
debug
(
"receive PullMessage request command, {}"
,
request
);
}
log
.
debug
(
"receive PullMessage request command, {}"
,
request
);
if
(!
PermName
.
isReadable
(
this
.
brokerController
.
getBrokerConfig
().
getBrokerPermission
()))
{
response
.
setCode
(
ResponseCode
.
NO_PERMISSION
);
...
...
@@ -126,7 +124,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
requestHeader
.
getTopic
());
if
(
null
==
topicConfig
)
{
LOG
.
error
(
"The topic {} not exist, consumer: {}
"
,
requestHeader
.
getTopic
(),
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
log
.
error
(
"the topic {} not exist, consumer: {}
"
,
requestHeader
.
getTopic
(),
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
response
.
setCode
(
ResponseCode
.
TOPIC_NOT_EXIST
);
response
.
setRemark
(
String
.
format
(
"topic[%s] not exist, apply first please! %s"
,
requestHeader
.
getTopic
(),
FAQUrl
.
suggestTodo
(
FAQUrl
.
APPLY_TOPIC_URL
)));
return
response
;
...
...
@@ -141,7 +139,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
if
(
requestHeader
.
getQueueId
()
<
0
||
requestHeader
.
getQueueId
()
>=
topicConfig
.
getReadQueueNums
())
{
String
errorInfo
=
String
.
format
(
"queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]"
,
requestHeader
.
getQueueId
(),
requestHeader
.
getTopic
(),
topicConfig
.
getReadQueueNums
(),
channel
.
remoteAddress
());
LOG
.
warn
(
errorInfo
);
log
.
warn
(
errorInfo
);
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
errorInfo
);
return
response
;
...
...
@@ -162,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
assert
consumerFilterData
!=
null
;
}
}
catch
(
Exception
e
)
{
LOG
.
warn
(
"Parse the consumer's subscription[{}] failed, group: {}"
,
requestHeader
.
getSubscription
(),
//
log
.
warn
(
"Parse the consumer's subscription[{}] failed, group: {}"
,
requestHeader
.
getSubscription
(),
//
requestHeader
.
getConsumerGroup
());
response
.
setCode
(
ResponseCode
.
SUBSCRIPTION_PARSE_FAILED
);
response
.
setRemark
(
"parse the consumer's subscription failed"
);
...
...
@@ -172,7 +170,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
ConsumerGroupInfo
consumerGroupInfo
=
this
.
brokerController
.
getConsumerManager
().
getConsumerGroupInfo
(
requestHeader
.
getConsumerGroup
());
if
(
null
==
consumerGroupInfo
)
{
LOG
.
warn
(
"T
he consumer's group info not exist, group: {}"
,
requestHeader
.
getConsumerGroup
());
log
.
warn
(
"t
he consumer's group info not exist, group: {}"
,
requestHeader
.
getConsumerGroup
());
response
.
setCode
(
ResponseCode
.
SUBSCRIPTION_NOT_EXIST
);
response
.
setRemark
(
"the consumer's group info not exist"
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
SAME_GROUP_DIFFERENT_TOPIC
));
return
response
;
...
...
@@ -187,14 +185,14 @@ public class PullMessageProcessor implements NettyRequestProcessor {
subscriptionData
=
consumerGroupInfo
.
findSubscriptionData
(
requestHeader
.
getTopic
());
if
(
null
==
subscriptionData
)
{
LOG
.
warn
(
"T
he consumer's subscription not exist, group: {}, topic:{}"
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
());
log
.
warn
(
"t
he consumer's subscription not exist, group: {}, topic:{}"
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
());
response
.
setCode
(
ResponseCode
.
SUBSCRIPTION_NOT_EXIST
);
response
.
setRemark
(
"the consumer's subscription not exist"
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
SAME_GROUP_DIFFERENT_TOPIC
));
return
response
;
}
if
(
subscriptionData
.
getSubVersion
()
<
requestHeader
.
getSubVersion
())
{
LOG
.
warn
(
"The broker's subscription is not latest, group: {} {}"
,
requestHeader
.
getConsumerGroup
(),
log
.
warn
(
"The broker's subscription is not latest, group: {} {}"
,
requestHeader
.
getConsumerGroup
(),
subscriptionData
.
getSubString
());
response
.
setCode
(
ResponseCode
.
SUBSCRIPTION_NOT_LATEST
);
response
.
setRemark
(
"the consumer's subscription not latest"
);
...
...
@@ -209,7 +207,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return
response
;
}
if
(
consumerFilterData
.
getClientVersion
()
<
requestHeader
.
getSubVersion
())
{
LOG
.
warn
(
"The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}"
,
log
.
warn
(
"The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}"
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
consumerFilterData
.
getClientVersion
(),
requestHeader
.
getSubVersion
());
response
.
setCode
(
ResponseCode
.
FILTER_DATA_NOT_LATEST
);
response
.
setRemark
(
"the consumer's consumer filter data not latest"
);
...
...
@@ -287,7 +285,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response
.
setCode
(
ResponseCode
.
PULL_OFFSET_MOVED
);
// XXX: warn and notify me
LOG
.
info
(
"the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}"
,
//
log
.
info
(
"the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}"
,
//
requestHeader
.
getQueueOffset
(),
//
getMessageResult
.
getNextBeginOffset
(),
//
requestHeader
.
getTopic
(),
//
...
...
@@ -307,14 +305,15 @@ public class PullMessageProcessor implements NettyRequestProcessor {
case
OFFSET_OVERFLOW_BADLY:
response
.
setCode
(
ResponseCode
.
PULL_OFFSET_MOVED
);
// XXX: warn and notify me
LOG
.
info
(
"The request offset:{} over flow badly, broker max offset:{} , consumer: {}"
,
requestHeader
.
getQueueOffset
(),
getMessageResult
.
getMaxOffset
(),
channel
.
remoteAddress
());
log
.
info
(
"the request offset: {} over flow badly, broker max offset: {}, consumer: {}"
,
requestHeader
.
getQueueOffset
(),
getMessageResult
.
getMaxOffset
(),
channel
.
remoteAddress
());
break
;
case
OFFSET_OVERFLOW_ONE:
response
.
setCode
(
ResponseCode
.
PULL_NOT_FOUND
);
break
;
case
OFFSET_TOO_SMALL:
response
.
setCode
(
ResponseCode
.
PULL_OFFSET_MOVED
);
LOG
.
info
(
"The request offset is
too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}"
,
log
.
info
(
"the request offset
too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}"
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueOffset
(),
getMessageResult
.
getMinOffset
(),
channel
.
remoteAddress
());
break
;
...
...
@@ -391,12 +390,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
public
void
operationComplete
(
ChannelFuture
future
)
throws
Exception
{
getMessageResult
.
release
();
if
(!
future
.
isSuccess
())
{
LOG
.
error
(
"Fail to transfer messages from page cache to
{}"
,
channel
.
remoteAddress
(),
future
.
cause
());
log
.
error
(
"transfer many message by pagecache failed,
{}"
,
channel
.
remoteAddress
(),
future
.
cause
());
}
}
});
}
catch
(
Throwable
e
)
{
LOG
.
error
(
"Error occurred when transferring messages from page cache
"
,
e
);
log
.
error
(
"transfer many message by pagecache exception
"
,
e
);
getMessageResult
.
release
();
}
...
...
@@ -437,14 +436,14 @@ public class PullMessageProcessor implements NettyRequestProcessor {
event
.
setOffsetRequest
(
requestHeader
.
getQueueOffset
());
event
.
setOffsetNew
(
getMessageResult
.
getNextBeginOffset
());
this
.
generateOffsetMovedEvent
(
event
);
LOG
.
warn
(
log
.
warn
(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}"
,
requestHeader
.
getTopic
(),
requestHeader
.
getConsumerGroup
(),
event
.
getOffsetRequest
(),
event
.
getOffsetNew
(),
responseHeader
.
getSuggestWhichBrokerId
());
}
else
{
responseHeader
.
setSuggestWhichBrokerId
(
subscriptionGroupConfig
.
getBrokerId
());
response
.
setCode
(
ResponseCode
.
PULL_RETRY_IMMEDIATELY
);
LOG
.
warn
(
"PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}"
,
log
.
warn
(
"PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}"
,
requestHeader
.
getTopic
(),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getQueueOffset
(),
responseHeader
.
getSuggestWhichBrokerId
());
}
...
...
@@ -525,7 +524,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
PutMessageResult
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessage
(
msgInner
);
}
catch
(
Exception
e
)
{
LOG
.
warn
(
String
.
format
(
"G
enerateOffsetMovedEvent Exception, %s"
,
event
.
toString
()),
e
);
log
.
warn
(
String
.
format
(
"g
enerateOffsetMovedEvent Exception, %s"
,
event
.
toString
()),
e
);
}
}
...
...
@@ -544,20 +543,21 @@ public class PullMessageProcessor implements NettyRequestProcessor {
@Override
public
void
operationComplete
(
ChannelFuture
future
)
throws
Exception
{
if
(!
future
.
isSuccess
())
{
LOG
.
error
(
"ProcessRequestWrapper response to {} failed"
,
future
.
channel
().
remoteAddress
(),
future
.
cause
());
LOG
.
error
(
request
.
toString
());
LOG
.
error
(
response
.
toString
());
log
.
error
(
"processRequestWrapper response to {} failed"
,
future
.
channel
().
remoteAddress
(),
future
.
cause
());
log
.
error
(
request
.
toString
());
log
.
error
(
response
.
toString
());
}
}
});
}
catch
(
Throwable
e
)
{
LOG
.
error
(
"P
rocessRequestWrapper process request over, but response failed"
,
e
);
LOG
.
error
(
request
.
toString
());
LOG
.
error
(
response
.
toString
());
log
.
error
(
"p
rocessRequestWrapper process request over, but response failed"
,
e
);
log
.
error
(
request
.
toString
());
log
.
error
(
response
.
toString
());
}
}
}
catch
(
RemotingCommandException
e1
)
{
LOG
.
error
(
"Exe
cuteRequestWhenWakeup run"
,
e1
);
log
.
error
(
"ex
cuteRequestWhenWakeup run"
,
e1
);
}
}
};
...
...
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
浏览文件 @
246be9eb
...
...
@@ -41,7 +41,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
public
class
TopicConfigManager
extends
ConfigManager
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
static
final
long
LOCK_TIMEOUT_MILLIS
=
3000
;
private
transient
final
Lock
lockTopicConfigTable
=
new
ReentrantLock
();
...
...
@@ -181,15 +181,17 @@ public class TopicConfigManager extends ConfigManager {
topicConfig
.
setTopicSysFlag
(
topicSysFlag
);
topicConfig
.
setTopicFilterType
(
defaultTopicConfig
.
getTopicFilterType
());
}
else
{
LOG
.
warn
(
"Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]"
,
log
.
warn
(
"Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]"
,
defaultTopic
,
defaultTopicConfig
.
getPerm
(),
remoteAddress
);
}
}
else
{
LOG
.
warn
(
"Create new topic failed, because the default topic[{}] not exist. producer:[{}]"
,
defaultTopic
,
remoteAddress
);
log
.
warn
(
"Create new topic failed, because the default topic[{}] not exist. producer:[{}]"
,
defaultTopic
,
remoteAddress
);
}
if
(
topicConfig
!=
null
)
{
LOG
.
info
(
"Create new topic by default topic:[{}] config:[{}] producer:[{}]"
,
defaultTopic
,
topicConfig
,
remoteAddress
);
log
.
info
(
"Create new topic by default topic:[{}] config:[{}] producer:[{}]"
,
defaultTopic
,
topicConfig
,
remoteAddress
);
this
.
topicConfigTable
.
put
(
topic
,
topicConfig
);
...
...
@@ -204,7 +206,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"createTopicInSendMessageMethod exception"
,
e
);
log
.
error
(
"createTopicInSendMessageMethod exception"
,
e
);
}
if
(
createNew
)
{
...
...
@@ -238,7 +240,7 @@ public class TopicConfigManager extends ConfigManager {
topicConfig
.
setPerm
(
perm
);
topicConfig
.
setTopicSysFlag
(
topicSysFlag
);
LOG
.
info
(
"create new topic {}"
,
topicConfig
);
log
.
info
(
"create new topic {}"
,
topicConfig
);
this
.
topicConfigTable
.
put
(
topic
,
topicConfig
);
createNew
=
true
;
this
.
dataVersion
.
nextVersion
();
...
...
@@ -248,7 +250,7 @@ public class TopicConfigManager extends ConfigManager {
}
}
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"createTopicInSendMessageBackMethod exception"
,
e
);
log
.
error
(
"createTopicInSendMessageBackMethod exception"
,
e
);
}
if
(
createNew
)
{
...
...
@@ -269,7 +271,7 @@ public class TopicConfigManager extends ConfigManager {
topicConfig
.
setTopicSysFlag
(
TopicSysFlag
.
clearUnitFlag
(
oldTopicSysFlag
));
}
LOG
.
info
(
"update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag"
,
oldTopicSysFlag
,
log
.
info
(
"update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag"
,
oldTopicSysFlag
,
topicConfig
.
getTopicSysFlag
());
this
.
topicConfigTable
.
put
(
topic
,
topicConfig
);
...
...
@@ -289,7 +291,7 @@ public class TopicConfigManager extends ConfigManager {
topicConfig
.
setTopicSysFlag
(
TopicSysFlag
.
setUnitSubFlag
(
oldTopicSysFlag
));
}
LOG
.
info
(
"update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag"
,
oldTopicSysFlag
,
log
.
info
(
"update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag"
,
oldTopicSysFlag
,
topicConfig
.
getTopicSysFlag
());
this
.
topicConfigTable
.
put
(
topic
,
topicConfig
);
...
...
@@ -304,9 +306,9 @@ public class TopicConfigManager extends ConfigManager {
public
void
updateTopicConfig
(
final
TopicConfig
topicConfig
)
{
TopicConfig
old
=
this
.
topicConfigTable
.
put
(
topicConfig
.
getTopicName
(),
topicConfig
);
if
(
old
!=
null
)
{
LOG
.
info
(
"update topic config, old:[{}] new:[{}]"
,
old
,
topicConfig
);
log
.
info
(
"update topic config, old:[{}] new:[{}]"
,
old
,
topicConfig
);
}
else
{
LOG
.
info
(
"create new topic [{}]"
,
topicConfig
);
log
.
info
(
"create new topic [{}]"
,
topicConfig
);
}
this
.
dataVersion
.
nextVersion
();
...
...
@@ -324,7 +326,7 @@ public class TopicConfigManager extends ConfigManager {
if
(
topicConfig
!=
null
&&
!
topicConfig
.
isOrder
())
{
topicConfig
.
setOrder
(
true
);
isChange
=
true
;
LOG
.
info
(
"update order topic config, topic={}, order={}"
,
topic
,
true
);
log
.
info
(
"update order topic config, topic={}, order={}"
,
topic
,
true
);
}
}
...
...
@@ -335,7 +337,7 @@ public class TopicConfigManager extends ConfigManager {
if
(
topicConfig
.
isOrder
())
{
topicConfig
.
setOrder
(
false
);
isChange
=
true
;
LOG
.
info
(
"update order topic config, topic={}, order={}"
,
topic
,
false
);
log
.
info
(
"update order topic config, topic={}, order={}"
,
topic
,
false
);
}
}
}
...
...
@@ -359,11 +361,11 @@ public class TopicConfigManager extends ConfigManager {
public
void
deleteTopicConfig
(
final
String
topic
)
{
TopicConfig
old
=
this
.
topicConfigTable
.
remove
(
topic
);
if
(
old
!=
null
)
{
LOG
.
info
(
"Delete topic config OK, topic:
{}"
,
old
);
log
.
info
(
"delete topic config OK, topic:
{}"
,
old
);
this
.
dataVersion
.
nextVersion
();
this
.
persist
();
}
else
{
LOG
.
warn
(
"Delete topic config failed, topic:{} not exist
"
,
topic
);
log
.
warn
(
"delete topic config failed, topic: {} not exists
"
,
topic
);
}
}
...
...
@@ -409,7 +411,7 @@ public class TopicConfigManager extends ConfigManager {
Iterator
<
Entry
<
String
,
TopicConfig
>>
it
=
tcs
.
getTopicConfigTable
().
entrySet
().
iterator
();
while
(
it
.
hasNext
())
{
Entry
<
String
,
TopicConfig
>
next
=
it
.
next
();
LOG
.
info
(
"load exist local topic, {}"
,
next
.
getValue
().
toString
());
log
.
info
(
"load exist local topic, {}"
,
next
.
getValue
().
toString
());
}
}
...
...
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
浏览文件 @
246be9eb
...
...
@@ -16,13 +16,19 @@
*/
package
org.apache.rocketmq.common
;
import
java.net.InetAddress
;
import
java.net.UnknownHostException
;
import
org.apache.rocketmq.common.annotation.ImportantField
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.remoting.common.RemotingUtil
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.net.InetAddress
;
import
java.net.UnknownHostException
;
public
class
BrokerConfig
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
private
String
rocketmqHome
=
System
.
getProperty
(
MixAll
.
ROCKETMQ_HOME_PROPERTY
,
System
.
getenv
(
MixAll
.
ROCKETMQ_HOME_ENV
));
@ImportantField
private
String
namesrvAddr
=
System
.
getProperty
(
MixAll
.
NAMESRV_ADDR_PROPERTY
,
System
.
getenv
(
MixAll
.
NAMESRV_ADDR_ENV
));
...
...
@@ -121,16 +127,6 @@ public class BrokerConfig {
private
boolean
filterSupportRetry
=
false
;
private
boolean
enablePropertyFilter
=
false
;
public
static
String
localHostName
()
{
try
{
return
InetAddress
.
getLocalHost
().
getHostName
();
}
catch
(
UnknownHostException
e
)
{
e
.
printStackTrace
();
}
return
"DEFAULT_BROKER"
;
}
public
boolean
isTraceOn
()
{
return
traceOn
;
}
...
...
@@ -179,6 +175,16 @@ public class BrokerConfig {
this
.
slaveReadEnable
=
slaveReadEnable
;
}
public
static
String
localHostName
()
{
try
{
return
InetAddress
.
getLocalHost
().
getHostName
();
}
catch
(
UnknownHostException
e
)
{
log
.
error
(
"Failed to obtain the host name"
,
e
);
}
return
"DEFAULT_BROKER"
;
}
public
int
getRegisterBrokerTimeoutMills
()
{
return
registerBrokerTimeoutMills
;
}
...
...
common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
浏览文件 @
246be9eb
...
...
@@ -22,7 +22,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
public
abstract
class
ConfigManager
{
private
static
final
Logger
PLOG
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
public
abstract
String
encode
();
...
...
@@ -36,11 +36,11 @@ public abstract class ConfigManager {
return
this
.
loadBak
();
}
else
{
this
.
decode
(
jsonString
);
PLOG
.
info
(
"load {} OK"
,
fileName
);
log
.
info
(
"load {} OK"
,
fileName
);
return
true
;
}
}
catch
(
Exception
e
)
{
PLOG
.
error
(
"load "
+
fileName
+
" Failed, and try to load backup file"
,
e
);
log
.
error
(
"load [{}] failed, and try to load backup file"
,
fileName
,
e
);
return
this
.
loadBak
();
}
}
...
...
@@ -54,11 +54,11 @@ public abstract class ConfigManager {
String
jsonString
=
MixAll
.
file2String
(
fileName
+
".bak"
);
if
(
jsonString
!=
null
&&
jsonString
.
length
()
>
0
)
{
this
.
decode
(
jsonString
);
PLOG
.
info
(
"load "
+
fileName
+
" OK"
);
log
.
info
(
"load [{}] OK"
,
fileName
);
return
true
;
}
}
catch
(
Exception
e
)
{
PLOG
.
error
(
"load "
+
fileName
+
" Failed"
,
e
);
log
.
error
(
"load [{}] Failed"
,
fileName
,
e
);
return
false
;
}
...
...
@@ -74,7 +74,7 @@ public abstract class ConfigManager {
try
{
MixAll
.
string2File
(
jsonString
,
fileName
);
}
catch
(
IOException
e
)
{
PLOG
.
error
(
"persist file Exception, "
+
fileName
,
e
);
log
.
error
(
"persist file [{}] exception"
,
fileName
,
e
);
}
}
}
...
...
common/src/main/java/org/apache/rocketmq/common/MixAll.java
浏览文件 @
246be9eb
...
...
@@ -22,7 +22,6 @@ import java.io.FileInputStream;
import
java.io.FileWriter
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.io.UnsupportedEncodingException
;
import
java.lang.annotation.Annotation
;
import
java.lang.reflect.Field
;
import
java.lang.reflect.Method
;
...
...
@@ -43,10 +42,14 @@ import java.util.Properties;
import
java.util.Set
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.rocketmq.common.annotation.ImportantField
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
MixAll
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
public
static
final
String
ROCKETMQ_HOME_ENV
=
"ROCKETMQ_HOME"
;
public
static
final
String
ROCKETMQ_HOME_PROPERTY
=
"rocketmq.home.dir"
;
public
static
final
String
NAMESRV_ADDR_ENV
=
"NAMESRV_ADDR"
;
...
...
@@ -243,11 +246,11 @@ public class MixAll {
return
url
.
getPath
();
}
public
static
void
printObjectProperties
(
final
Logger
log
,
final
Object
object
)
{
printObjectProperties
(
log
,
object
,
false
);
public
static
void
printObjectProperties
(
final
Logger
log
ger
,
final
Object
object
)
{
printObjectProperties
(
log
ger
,
object
,
false
);
}
public
static
void
printObjectProperties
(
final
Logger
log
,
final
Object
object
,
final
boolean
onlyImportantField
)
{
public
static
void
printObjectProperties
(
final
Logger
log
ger
,
final
Object
object
,
final
boolean
onlyImportantField
)
{
Field
[]
fields
=
object
.
getClass
().
getDeclaredFields
();
for
(
Field
field
:
fields
)
{
if
(!
Modifier
.
isStatic
(
field
.
getModifiers
()))
{
...
...
@@ -261,7 +264,7 @@ public class MixAll {
value
=
""
;
}
}
catch
(
IllegalAccessException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Failed to obtain object properties"
,
e
);
}
if
(
onlyImportantField
)
{
...
...
@@ -271,8 +274,9 @@ public class MixAll {
}
}
if
(
log
!=
null
)
{
log
.
info
(
name
+
"="
+
value
);
if
(
logger
!=
null
)
{
logger
.
info
(
name
+
"="
+
value
);
}
else
{
}
}
}
...
...
@@ -294,11 +298,8 @@ public class MixAll {
try
{
InputStream
in
=
new
ByteArrayInputStream
(
str
.
getBytes
(
DEFAULT_CHARSET
));
properties
.
load
(
in
);
}
catch
(
UnsupportedEncodingException
e
)
{
e
.
printStackTrace
();
return
null
;
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to handle properties"
,
e
);
return
null
;
}
...
...
@@ -318,7 +319,7 @@ public class MixAll {
field
.
setAccessible
(
true
);
value
=
field
.
get
(
object
);
}
catch
(
IllegalAccessException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Failed to handle properties"
,
e
);
}
if
(
value
!=
null
)
{
...
...
common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
浏览文件 @
246be9eb
...
...
@@ -23,7 +23,8 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
public
abstract
class
ServiceThread
implements
Runnable
{
private
static
final
Logger
STLOG
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
private
static
final
long
JOIN_TIME
=
90
*
1000
;
protected
final
Thread
thread
;
...
...
@@ -47,7 +48,7 @@ public abstract class ServiceThread implements Runnable {
public
void
shutdown
(
final
boolean
interrupt
)
{
this
.
stopped
=
true
;
STLOG
.
info
(
"shutdown thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
log
.
info
(
"shutdown thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
if
(
hasNotified
.
compareAndSet
(
false
,
true
))
{
waitPoint
.
countDown
();
// notify
...
...
@@ -63,10 +64,10 @@ public abstract class ServiceThread implements Runnable {
this
.
thread
.
join
(
this
.
getJointime
());
}
long
eclipseTime
=
System
.
currentTimeMillis
()
-
beginTime
;
STLOG
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" eclipse time(ms) "
+
eclipseTime
+
" "
log
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" eclipse time(ms) "
+
eclipseTime
+
" "
+
this
.
getJointime
());
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
}
...
...
@@ -80,7 +81,7 @@ public abstract class ServiceThread implements Runnable {
public
void
stop
(
final
boolean
interrupt
)
{
this
.
stopped
=
true
;
STLOG
.
info
(
"stop thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
log
.
info
(
"stop thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
if
(
hasNotified
.
compareAndSet
(
false
,
true
))
{
waitPoint
.
countDown
();
// notify
...
...
@@ -93,7 +94,7 @@ public abstract class ServiceThread implements Runnable {
public
void
makeStop
()
{
this
.
stopped
=
true
;
STLOG
.
info
(
"makestop thread "
+
this
.
getServiceName
());
log
.
info
(
"makestop thread "
+
this
.
getServiceName
());
}
public
void
wakeup
()
{
...
...
@@ -114,7 +115,7 @@ public abstract class ServiceThread implements Runnable {
try
{
waitPoint
.
await
(
interval
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
finally
{
hasNotified
.
set
(
false
);
this
.
onWaitEnd
();
...
...
common/src/main/java/org/apache/rocketmq/common/UtilAll.java
浏览文件 @
246be9eb
...
...
@@ -36,9 +36,16 @@ import java.util.Map;
import
java.util.zip.CRC32
;
import
java.util.zip.DeflaterOutputStream
;
import
java.util.zip.InflaterInputStream
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
UtilAll
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
public
static
final
String
YYYY_MM_DD_HH_MM_SS
=
"yyyy-MM-dd HH:mm:ss"
;
public
static
final
String
YYYY_MM_DD_HH_MM_SS_SSS
=
"yyyy-MM-dd#HH:mm:ss:SSS"
;
public
static
final
String
YYYYMMDDHHMMSS
=
"yyyyMMddHHmmss"
;
...
...
@@ -269,15 +276,18 @@ public class UtilAll {
}
finally
{
try
{
byteArrayInputStream
.
close
();
}
catch
(
IOException
ignored
)
{
}
catch
(
IOException
e
)
{
log
.
error
(
"Failed to close the stream"
,
e
);
}
try
{
inflaterInputStream
.
close
();
}
catch
(
IOException
ignored
)
{
}
catch
(
IOException
e
)
{
log
.
error
(
"Failed to close the stream"
,
e
);
}
try
{
byteArrayOutputStream
.
close
();
}
catch
(
IOException
ignored
)
{
}
catch
(
IOException
e
)
{
log
.
error
(
"Failed to close the stream"
,
e
);
}
}
...
...
common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
浏览文件 @
246be9eb
...
...
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
public
class
NamesrvConfig
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
NAMESRV_LOGGER_NAME
);
private
String
rocketmqHome
=
System
.
getProperty
(
MixAll
.
ROCKETMQ_HOME_PROPERTY
,
System
.
getenv
(
MixAll
.
ROCKETMQ_HOME_ENV
));
private
String
rocketmqHome
=
System
.
getProperty
(
MixAll
.
ROCKETMQ_HOME_PROPERTY
,
System
.
getenv
(
MixAll
.
ROCKETMQ_HOME_ENV
));
private
String
kvConfigPath
=
System
.
getProperty
(
"user.home"
)
+
File
.
separator
+
"namesrv"
+
File
.
separator
+
"kvConfig.json"
;
private
String
configStorePath
=
System
.
getProperty
(
"user.home"
)
+
File
.
separator
+
"namesrv"
+
File
.
separator
+
"namesrv.properties"
;
private
String
productEnvName
=
"center"
;
...
...
common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
浏览文件 @
246be9eb
...
...
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
public
class
TopAddressing
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
private
String
nsAddr
;
private
String
wsAddr
;
private
String
unitName
;
...
...
common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
浏览文件 @
246be9eb
...
...
@@ -17,11 +17,16 @@
package
org.apache.rocketmq.common.protocol
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
MQProtosHelper
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
COMMON_LOGGER_NAME
);
public
static
boolean
registerBrokerToNameServer
(
final
String
nsaddr
,
final
String
brokerAddr
,
final
long
timeoutMillis
)
{
RegisterBrokerRequestHeader
requestHeader
=
new
RegisterBrokerRequestHeader
();
...
...
@@ -36,7 +41,7 @@ public class MQProtosHelper {
return
ResponseCode
.
SUCCESS
==
response
.
getCode
();
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Failed to register broker"
,
e
);
}
return
false
;
...
...
filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java
浏览文件 @
246be9eb
...
...
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
public
class
DynaCode
{
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LoggerName
.
FILTERSRV_LOGGER_NAME
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
FILTERSRV_LOGGER_NAME
);
private
static
final
String
FILE_SP
=
System
.
getProperty
(
"file.separator"
);
...
...
@@ -231,7 +231,7 @@ public class DynaCode {
loadClass
.
put
(
getFullClassName
(
code
),
null
);
}
if
(
null
!=
srcFile
)
{
LOGGER
.
warn
(
"Dyna Create Java Source File:---->"
+
srcFile
.
getAbsolutePath
());
log
.
warn
(
"Dyna Create Java Source File:----> {}"
,
srcFile
.
getAbsolutePath
());
srcFileAbsolutePaths
.
add
(
srcFile
.
getAbsolutePath
());
srcFile
.
deleteOnExit
();
}
...
...
@@ -277,9 +277,9 @@ public class DynaCode {
Class
<?>
classz
=
classLoader
.
loadClass
(
key
);
if
(
null
!=
classz
)
{
loadClass
.
put
(
key
,
classz
);
LOGGER
.
info
(
"Dyna Load Java Class File OK:----> className: "
+
key
);
log
.
info
(
"Dyna Load Java Class File OK:----> className: {}"
,
key
);
}
else
{
LOGGER
.
error
(
"Dyna Load Java Class File Fail:----> className: "
+
key
);
log
.
error
(
"Dyna Load Java Class File Fail:----> className: {}"
,
key
);
}
}
}
...
...
namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
浏览文件 @
246be9eb
...
...
@@ -45,7 +45,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
try
{
adminExt
.
start
();
}
catch
(
MQClientException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Failed to start processor"
,
e
);
}
}
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
浏览文件 @
246be9eb
...
...
@@ -26,11 +26,15 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
RemotingHelper
{
public
static
final
String
ROCKETMQ_REMOTING
=
"RocketmqRemoting"
;
public
static
final
String
DEFAULT_CHARSET
=
"UTF-8"
;
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
ROCKETMQ_REMOTING
);
public
static
String
exceptionSimpleDesc
(
final
Throwable
e
)
{
StringBuffer
sb
=
new
StringBuffer
();
if
(
e
!=
null
)
{
...
...
@@ -126,7 +130,7 @@ public class RemotingHelper {
byteBufferBody
.
flip
();
return
RemotingCommand
.
decode
(
byteBufferBody
);
}
catch
(
IOException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"invokeSync failure"
,
e
);
if
(
sendRequestOK
)
{
throw
new
RemotingTimeoutException
(
addr
,
timeoutMillis
);
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
浏览文件 @
246be9eb
...
...
@@ -26,8 +26,6 @@ import java.net.InetAddress;
import
java.net.InetSocketAddress
;
import
java.net.NetworkInterface
;
import
java.net.SocketAddress
;
import
java.net.SocketException
;
import
java.net.UnknownHostException
;
import
java.nio.channels.Selector
;
import
java.nio.channels.SocketChannel
;
import
java.nio.channels.spi.SelectorProvider
;
...
...
@@ -130,10 +128,8 @@ public class RemotingUtil {
//If failed to find,fall back to localhost
final
InetAddress
localHost
=
InetAddress
.
getLocalHost
();
return
normalizeHostAddress
(
localHost
);
}
catch
(
SocketException
e
)
{
e
.
printStackTrace
();
}
catch
(
UnknownHostException
e
)
{
e
.
printStackTrace
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Failed to obtain local address"
,
e
);
}
return
null
;
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
浏览文件 @
246be9eb
...
...
@@ -23,7 +23,8 @@ import org.slf4j.LoggerFactory;
* Base class for background thread
*/
public
abstract
class
ServiceThread
implements
Runnable
{
private
static
final
Logger
STLOG
=
LoggerFactory
.
getLogger
(
RemotingHelper
.
ROCKETMQ_REMOTING
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
RemotingHelper
.
ROCKETMQ_REMOTING
);
private
static
final
long
JOIN_TIME
=
90
*
1000
;
protected
final
Thread
thread
;
protected
volatile
boolean
hasNotified
=
false
;
...
...
@@ -45,7 +46,7 @@ public abstract class ServiceThread implements Runnable {
public
void
shutdown
(
final
boolean
interrupt
)
{
this
.
stopped
=
true
;
STLOG
.
info
(
"shutdown thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
log
.
info
(
"shutdown thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
synchronized
(
this
)
{
if
(!
this
.
hasNotified
)
{
this
.
hasNotified
=
true
;
...
...
@@ -61,10 +62,10 @@ public abstract class ServiceThread implements Runnable {
long
beginTime
=
System
.
currentTimeMillis
();
this
.
thread
.
join
(
this
.
getJointime
());
long
eclipseTime
=
System
.
currentTimeMillis
()
-
beginTime
;
STLOG
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" eclipse time(ms) "
+
eclipseTime
+
" "
log
.
info
(
"join thread "
+
this
.
getServiceName
()
+
" eclipse time(ms) "
+
eclipseTime
+
" "
+
this
.
getJointime
());
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
}
...
...
@@ -78,7 +79,7 @@ public abstract class ServiceThread implements Runnable {
public
void
stop
(
final
boolean
interrupt
)
{
this
.
stopped
=
true
;
STLOG
.
info
(
"stop thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
log
.
info
(
"stop thread "
+
this
.
getServiceName
()
+
" interrupt "
+
interrupt
);
synchronized
(
this
)
{
if
(!
this
.
hasNotified
)
{
this
.
hasNotified
=
true
;
...
...
@@ -93,7 +94,7 @@ public abstract class ServiceThread implements Runnable {
public
void
makeStop
()
{
this
.
stopped
=
true
;
STLOG
.
info
(
"makestop thread "
+
this
.
getServiceName
());
log
.
info
(
"makestop thread "
+
this
.
getServiceName
());
}
public
void
wakeup
()
{
...
...
@@ -116,7 +117,7 @@ public abstract class ServiceThread implements Runnable {
try
{
this
.
wait
(
interval
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
finally
{
this
.
hasNotified
=
false
;
this
.
onWaitEnd
();
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
浏览文件 @
246be9eb
...
...
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
public
class
NettyDecoder
extends
LengthFieldBasedFrameDecoder
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
RemotingHelper
.
ROCKETMQ_REMOTING
);
private
static
final
int
FRAME_MAX_LENGTH
=
//
Integer
.
parseInt
(
System
.
getProperty
(
"com.rocketmq.remoting.frameMaxLength"
,
"16777216"
));
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
浏览文件 @
246be9eb
...
...
@@ -53,7 +53,7 @@ public abstract class NettyRemotingAbstract {
/**
* Remoting logger instance.
*/
private
static
final
Logger
PLOG
=
LoggerFactory
.
getLogger
(
RemotingHelper
.
ROCKETMQ_REMOTING
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
RemotingHelper
.
ROCKETMQ_REMOTING
);
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
...
...
@@ -175,17 +175,17 @@ public abstract class NettyRemotingAbstract {
try
{
ctx
.
writeAndFlush
(
response
);
}
catch
(
Throwable
e
)
{
PLOG
.
error
(
"process request over, but response failed"
,
e
);
PLOG
.
error
(
cmd
.
toString
());
PLOG
.
error
(
response
.
toString
());
log
.
error
(
"process request over, but response failed"
,
e
);
log
.
error
(
cmd
.
toString
());
log
.
error
(
response
.
toString
());
}
}
else
{
}
}
}
catch
(
Throwable
e
)
{
PLOG
.
error
(
"process request exception"
,
e
);
PLOG
.
error
(
cmd
.
toString
());
log
.
error
(
"process request exception"
,
e
);
log
.
error
(
cmd
.
toString
());
if
(!
cmd
.
isOnewayRPC
())
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
RemotingSysResponseCode
.
SYSTEM_ERROR
,
//
...
...
@@ -210,7 +210,7 @@ public abstract class NettyRemotingAbstract {
pair
.
getObject2
().
submit
(
requestTask
);
}
catch
(
RejectedExecutionException
e
)
{
if
((
System
.
currentTimeMillis
()
%
10000
)
==
0
)
{
PLOG
.
warn
(
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
())
//
log
.
warn
(
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
())
//
+
", too many requests and system thread pool busy, RejectedExecutionException "
//
+
pair
.
getObject2
().
toString
()
//
+
" request code: "
+
cmd
.
getCode
());
...
...
@@ -229,7 +229,7 @@ public abstract class NettyRemotingAbstract {
RemotingCommand
.
createResponseCommand
(
RemotingSysResponseCode
.
REQUEST_CODE_NOT_SUPPORTED
,
error
);
response
.
setOpaque
(
opaque
);
ctx
.
writeAndFlush
(
response
);
PLOG
.
error
(
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
())
+
error
);
log
.
error
(
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
())
+
error
);
}
}
...
...
@@ -254,8 +254,8 @@ public abstract class NettyRemotingAbstract {
responseFuture
.
putResponse
(
cmd
);
}
}
else
{
PLOG
.
warn
(
"receive response, but not matched any request, "
+
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
()));
PLOG
.
warn
(
cmd
.
toString
());
log
.
warn
(
"receive response, but not matched any request, "
+
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
()));
log
.
warn
(
cmd
.
toString
());
}
}
...
...
@@ -274,13 +274,13 @@ public abstract class NettyRemotingAbstract {
try
{
responseFuture
.
executeInvokeCallback
();
}
catch
(
Throwable
e
)
{
PLOG
.
warn
(
"execute callback in executor exception, and callback throw"
,
e
);
log
.
warn
(
"execute callback in executor exception, and callback throw"
,
e
);
}
}
});
}
catch
(
Exception
e
)
{
runInThisThread
=
true
;
PLOG
.
warn
(
"execute callback in executor exception, maybe executor busy"
,
e
);
log
.
warn
(
"execute callback in executor exception, maybe executor busy"
,
e
);
}
}
else
{
runInThisThread
=
true
;
...
...
@@ -290,7 +290,7 @@ public abstract class NettyRemotingAbstract {
try
{
responseFuture
.
executeInvokeCallback
();
}
catch
(
Throwable
e
)
{
PLOG
.
warn
(
"executeInvokeCallback Exception"
,
e
);
log
.
warn
(
"executeInvokeCallback Exception"
,
e
);
}
}
}
...
...
@@ -324,7 +324,7 @@ public abstract class NettyRemotingAbstract {
rep
.
release
();
it
.
remove
();
rfList
.
add
(
rep
);
PLOG
.
warn
(
"remove timeout request, "
+
rep
);
log
.
warn
(
"remove timeout request, "
+
rep
);
}
}
...
...
@@ -332,7 +332,7 @@ public abstract class NettyRemotingAbstract {
try
{
executeInvokeCallback
(
rf
);
}
catch
(
Throwable
e
)
{
PLOG
.
warn
(
"scanResponseTable, operationComplete Exception"
,
e
);
log
.
warn
(
"scanResponseTable, operationComplete Exception"
,
e
);
}
}
}
...
...
@@ -358,7 +358,7 @@ public abstract class NettyRemotingAbstract {
responseTable
.
remove
(
opaque
);
responseFuture
.
setCause
(
f
.
cause
());
responseFuture
.
putResponse
(
null
);
PLOG
.
warn
(
"send a request command to channel <"
+
addr
+
"> failed."
);
log
.
warn
(
"send a request command to channel <"
+
addr
+
"> failed."
);
}
});
...
...
@@ -404,17 +404,17 @@ public abstract class NettyRemotingAbstract {
try
{
executeInvokeCallback
(
responseFuture
);
}
catch
(
Throwable
e
)
{
PLOG
.
warn
(
"excute callback in writeAndFlush addListener, and callback throw"
,
e
);
log
.
warn
(
"excute callback in writeAndFlush addListener, and callback throw"
,
e
);
}
finally
{
responseFuture
.
release
();
}
PLOG
.
warn
(
"send a request command to channel <{}> failed."
,
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
log
.
warn
(
"send a request command to channel <{}> failed."
,
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
}
});
}
catch
(
Exception
e
)
{
responseFuture
.
release
();
PLOG
.
warn
(
"send a request command to channel <"
+
RemotingHelper
.
parseChannelRemoteAddr
(
channel
)
+
"> Exception"
,
e
);
log
.
warn
(
"send a request command to channel <"
+
RemotingHelper
.
parseChannelRemoteAddr
(
channel
)
+
"> Exception"
,
e
);
throw
new
RemotingSendRequestException
(
RemotingHelper
.
parseChannelRemoteAddr
(
channel
),
e
);
}
}
else
{
...
...
@@ -427,7 +427,7 @@ public abstract class NettyRemotingAbstract {
this
.
semaphoreAsync
.
getQueueLength
(),
//
this
.
semaphoreAsync
.
availablePermits
()
//
);
PLOG
.
warn
(
info
);
log
.
warn
(
info
);
throw
new
RemotingTimeoutException
(
info
);
}
}
...
...
@@ -445,13 +445,13 @@ public abstract class NettyRemotingAbstract {
public
void
operationComplete
(
ChannelFuture
f
)
throws
Exception
{
once
.
release
();
if
(!
f
.
isSuccess
())
{
PLOG
.
warn
(
"send a request command to channel <"
+
channel
.
remoteAddress
()
+
"> failed."
);
log
.
warn
(
"send a request command to channel <"
+
channel
.
remoteAddress
()
+
"> failed."
);
}
}
});
}
catch
(
Exception
e
)
{
once
.
release
();
PLOG
.
warn
(
"write send a request command to channel <"
+
channel
.
remoteAddress
()
+
"> failed."
);
log
.
warn
(
"write send a request command to channel <"
+
channel
.
remoteAddress
()
+
"> failed."
);
throw
new
RemotingSendRequestException
(
RemotingHelper
.
parseChannelRemoteAddr
(
channel
),
e
);
}
}
else
{
...
...
@@ -464,7 +464,7 @@ public abstract class NettyRemotingAbstract {
this
.
semaphoreOneway
.
getQueueLength
(),
//
this
.
semaphoreOneway
.
availablePermits
()
//
);
PLOG
.
warn
(
info
);
log
.
warn
(
info
);
throw
new
RemotingTimeoutException
(
info
);
}
}
...
...
@@ -478,13 +478,13 @@ public abstract class NettyRemotingAbstract {
if
(
this
.
eventQueue
.
size
()
<=
maxSize
)
{
this
.
eventQueue
.
add
(
event
);
}
else
{
PLOG
.
warn
(
"event queue size[{}] enough, so drop this event {}"
,
this
.
eventQueue
.
size
(),
event
.
toString
());
log
.
warn
(
"event queue size[{}] enough, so drop this event {}"
,
this
.
eventQueue
.
size
(),
event
.
toString
());
}
}
@Override
public
void
run
()
{
PLOG
.
info
(
this
.
getServiceName
()
+
" service started"
);
log
.
info
(
this
.
getServiceName
()
+
" service started"
);
final
ChannelEventListener
listener
=
NettyRemotingAbstract
.
this
.
getChannelEventListener
();
...
...
@@ -511,11 +511,11 @@ public abstract class NettyRemotingAbstract {
}
}
}
catch
(
Exception
e
)
{
PLOG
.
warn
(
this
.
getServiceName
()
+
" service has exception. "
,
e
);
log
.
warn
(
this
.
getServiceName
()
+
" service has exception. "
,
e
);
}
}
PLOG
.
info
(
this
.
getServiceName
()
+
" service end"
);
log
.
info
(
this
.
getServiceName
()
+
" service end"
);
}
@Override
...
...
store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
浏览文件 @
246be9eb
...
...
@@ -127,7 +127,7 @@ public class AllocateMappedFileService extends ServiceThread {
try
{
this
.
thread
.
join
(
this
.
getJointime
());
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
for
(
AllocateRequest
req
:
this
.
requestTable
.
values
())
{
...
...
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
246be9eb
...
...
@@ -903,35 +903,6 @@ public class CommitLog {
public
static
class
GroupCommitRequest
{
private
final
long
nextOffset
;
private
final
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
1
);
private
volatile
boolean
flushOK
=
false
;
public
GroupCommitRequest
(
long
nextOffset
)
{
this
.
nextOffset
=
nextOffset
;
}
public
long
getNextOffset
()
{
return
nextOffset
;
}
public
void
wakeupCustomer
(
final
boolean
flushOK
)
{
this
.
flushOK
=
flushOK
;
this
.
countDownLatch
.
countDown
();
}
public
boolean
waitForFlush
(
long
timeout
)
{
try
{
this
.
countDownLatch
.
await
(
timeout
,
TimeUnit
.
MILLISECONDS
);
return
this
.
flushOK
;
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
return
false
;
}
}
}
abstract
class
FlushCommitLogService
extends
ServiceThread
{
protected
static
final
int
RETRY_TIMES_OVER
=
10
;
}
...
...
@@ -1070,6 +1041,39 @@ public class CommitLog {
}
}
public
static
class
GroupCommitRequest
{
private
final
long
nextOffset
;
private
final
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
1
);
private
volatile
boolean
flushOK
=
false
;
public
GroupCommitRequest
(
long
nextOffset
)
{
this
.
nextOffset
=
nextOffset
;
}
public
long
getNextOffset
()
{
return
nextOffset
;
}
public
void
wakeupCustomer
(
final
boolean
flushOK
)
{
this
.
flushOK
=
flushOK
;
this
.
countDownLatch
.
countDown
();
}
public
boolean
waitForFlush
(
long
timeout
)
{
try
{
this
.
countDownLatch
.
await
(
timeout
,
TimeUnit
.
MILLISECONDS
);
return
this
.
flushOK
;
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"Interrupted"
,
e
);
return
false
;
}
}
}
/**
* GroupCommit Service
*/
...
...
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
浏览文件 @
246be9eb
...
...
@@ -25,9 +25,9 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
public
class
ConsumeQueue
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
STORE_LOGGER_NAME
);
public
static
final
int
CQ_STORE_UNIT_SIZE
=
20
;
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
STORE_LOGGER_NAME
);
private
static
final
Logger
LOG_ERROR
=
LoggerFactory
.
getLogger
(
LoggerName
.
STORE_ERROR_LOGGER_NAME
);
private
final
DefaultMessageStore
defaultMessageStore
;
...
...
store/src/main/java/org/apache/rocketmq/store/MappedFile.java
浏览文件 @
246be9eb
...
...
@@ -512,7 +512,7 @@ public class MappedFile extends ReferenceResource {
try
{
Thread
.
sleep
(
0
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
}
}
...
...
store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
浏览文件 @
246be9eb
...
...
@@ -71,7 +71,7 @@ public class StoreCheckpoint {
try
{
this
.
fileChannel
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Failed to properly close the channel"
,
e
);
}
}
...
...
store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
浏览文件 @
246be9eb
...
...
@@ -16,9 +16,14 @@
*/
package
org.apache.rocketmq.store.ha
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.HashMap
;
public
class
WaitNotifyObject
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
STORE_LOGGER_NAME
);
protected
final
HashMap
<
Long
/* thread id */
,
Boolean
/* notified */
>
waitingThreadTable
=
new
HashMap
<
Long
,
Boolean
>(
16
);
...
...
@@ -45,7 +50,7 @@ public class WaitNotifyObject {
try
{
this
.
wait
(
interval
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
finally
{
this
.
hasNotified
=
false
;
this
.
onWaitEnd
();
...
...
@@ -84,7 +89,7 @@ public class WaitNotifyObject {
try
{
this
.
wait
(
interval
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
finally
{
this
.
waitingThreadTable
.
put
(
currentThreadId
,
false
);
this
.
onWaitEnd
();
...
...
store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
浏览文件 @
246be9eb
...
...
@@ -147,7 +147,7 @@ public class IndexFile {
try
{
fileLock
.
release
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Failed to release the lock"
,
e
);
}
}
}
...
...
@@ -254,7 +254,7 @@ public class IndexFile {
try
{
fileLock
.
release
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Failed to release the lock"
,
e
);
}
}
...
...
store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
浏览文件 @
246be9eb
...
...
@@ -275,7 +275,7 @@ public class IndexService {
log
.
info
(
"Tried to create index file "
+
times
+
" times"
);
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
(
);
log
.
error
(
"Interrupted"
,
e
);
}
}
...
...
store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
浏览文件 @
246be9eb
...
...
@@ -44,8 +44,9 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
public
class
ScheduleMessageService
extends
ConfigManager
{
public
static
final
String
SCHEDULE_TOPIC
=
"SCHEDULE_TOPIC_XXXX"
;
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
STORE_LOGGER_NAME
);
public
static
final
String
SCHEDULE_TOPIC
=
"SCHEDULE_TOPIC_XXXX"
;
private
static
final
long
FIRST_DELAY_TIME
=
1000L
;
private
static
final
long
DELAY_FOR_A_WHILE
=
100L
;
private
static
final
long
DELAY_FOR_A_PERIOD
=
10000L
;
...
...
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
浏览文件 @
246be9eb
...
...
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
public
class
BrokerStats
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
final
DefaultMessageStore
defaultMessageStore
;
private
volatile
long
msgPutTotalYesterdayMorning
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录