提交 9f5383e5 编写于 作者: D duhenglucky

Add Snode server

上级 e42970b0
......@@ -58,6 +58,7 @@ import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.processor.SnodePullMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
......@@ -115,6 +116,7 @@ public class BrokerController {
private final ProducerManager producerManager;
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
private final SnodePullMessageProcessor snodePullMessageProcessor;
private final PullRequestHoldService pullRequestHoldService;
private final MessageArrivingListener messageArrivingListener;
private final Broker2Client broker2Client;
......@@ -184,7 +186,7 @@ public class BrokerController {
this.filterServerManager = new FilterServerManager(this);
this.slaveSynchronize = new SlaveSynchronize(this);
this.snodePullMessageProcessor = new SnodePullMessageProcessor(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
......@@ -513,7 +515,8 @@ public class BrokerController {
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor,pullMessageExecutor);
this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
*/
......@@ -671,6 +674,10 @@ public class BrokerController {
return pullMessageProcessor;
}
public SnodePullMessageProcessor getSnodePullMessageProcessor() {
return snodePullMessageProcessor;
}
public PullRequestHoldService getPullRequestHoldService() {
return pullRequestHoldService;
}
......
......@@ -60,9 +60,7 @@ public class BrokerStartup {
public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
......@@ -100,8 +98,9 @@ public class BrokerStartup {
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
commandLine = ServerUtil.parseCmdLine("broker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
......@@ -260,6 +259,8 @@ public class BrokerStartup {
}
private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
......
......@@ -55,7 +55,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
public void shutdown() {
......
......@@ -138,8 +138,13 @@ public class PullRequestHoldService extends ServiceThread {
if (match) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} else {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
}
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
......@@ -149,8 +154,13 @@ public class PullRequestHoldService extends ServiceThread {
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} else {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
}
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
......
......@@ -74,6 +74,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
log.info("heart beat request:{}", heartbeatData);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
......@@ -121,6 +122,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
......
......@@ -353,7 +353,8 @@ public class MQClientAPIImpl {
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
String addrS = "localhost:11911";//TODO FIXME
RemotingCommand response = this.remotingClient.invokeSync(addrS, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
......@@ -557,14 +558,15 @@ public class MQClientAPIImpl {
}
public PullResult pullMessage(
final String addr,
String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
requestHeader.setEnodeAddr(addr);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
addr = "localhost:11911"; //TODO FIXME
switch (communicationMode) {
case ONEWAY:
assert false;
......@@ -647,7 +649,7 @@ public class MQClientAPIImpl {
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
log.info("response header: {}", responseHeader.getSuggestWhichBrokerId());
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
......@@ -728,11 +730,12 @@ public class MQClientAPIImpl {
final String consumerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
String addrS = "localhost:11911";//TODO FIXME
RemotingCommand response = this.remotingClient.invokeSync(addrS,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
......
......@@ -537,7 +537,8 @@ public class MQClientInstance {
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
String addrS = "localhost:11911"; //TODO FIXME
int version = this.mQClientAPIImpl.sendHearbeat(addrS, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
......@@ -547,6 +548,7 @@ public class MQClientInstance {
log.info(heartbeatData.toString());
}
} catch (Exception e) {
log.error("send heart beat error:{}",e);
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
......
......@@ -247,6 +247,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
*
* @return
*/
@Override
......@@ -440,13 +441,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* DEFAULT ASYNC -------------------------------------------------------
*/
public void send(Message msg,
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
......@@ -481,7 +483,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
......@@ -653,18 +654,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
......@@ -733,6 +733,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
requestHeader.setEnodeAddr(brokerAddr);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
......@@ -943,8 +944,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param mq
* @param sendCallback
......@@ -1064,8 +1066,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
* A new one will be provided in next version
* It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
* provided in next version
*
* @param msg
* @param selector
* @param arg
......@@ -1076,7 +1079,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* @throws InterruptedException
*/
@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor();
......@@ -1120,7 +1124,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
......@@ -1243,6 +1247,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}
public ExecutorService getCallbackExecutor() {
return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
......
......@@ -37,4 +37,5 @@ public class LoggerName {
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
public static final String SNODE_LOGGER_NAME = "RocketmqSnode";
}
package org.apache.rocketmq.common.namesrv;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class RegisterSnodeResult {
}
......@@ -167,4 +167,9 @@ public class RequestCode {
public static final int QUERY_CONSUME_QUEUE = 321;
public static final int QUERY_DATA_VERSION = 322;
public static final int REGISTER_SNODE = 350;
public static final int SNODE_PULL_MESSAGE = 351;
}
......@@ -20,7 +20,9 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
......@@ -59,6 +61,26 @@ public class ClusterInfo extends RemotingSerializable {
return addrs.toArray(new String[] {});
}
public String[] retrieveAllMasterAddrByCluster(String cluster) {
List<String> addrs = new ArrayList<String>();
if (clusterAddrTable.containsKey(cluster)) {
Set<String> brokerNames = clusterAddrTable.get(cluster);
for (String brokerName : brokerNames) {
BrokerData brokerData = brokerAddrTable.get(brokerName);
if (null != brokerData) {
HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
for (Map.Entry<Long, String> entry : brokerAddrs.entrySet()) {
if (MixAll.MASTER_ID == entry.getKey()) {
addrs.add(entry.getValue());
}
}
}
}
}
return addrs.toArray(new String[] {});
}
public String[] retrieveAllClusterNames() {
return clusterAddrTable.keySet().toArray(new String[] {});
}
......
......@@ -48,6 +48,16 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
private Long subVersion;
private String expressionType;
private String enodeAddr;
public String getEnodeAddr() {
return enodeAddr;
}
public void setEnodeAddr(String enodeAddr) {
this.enodeAddr = enodeAddr;
}
@Override
public void checkFields() throws RemotingCommandException {
}
......
......@@ -52,6 +52,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private boolean batch = false;
private Integer maxReconsumeTimes;
private String enodeAddr;
@Override
public void checkFields() throws RemotingCommandException {
}
......@@ -159,4 +161,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
public void setBatch(boolean batch) {
this.batch = batch;
}
public String getEnodeAddr() {
return enodeAddr;
}
public void setEnodeAddr(String enodeAddr) {
this.enodeAddr = enodeAddr;
}
}
......@@ -54,6 +54,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNullable
private boolean m; //batch
private String n; //enode addr
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a);
......@@ -69,6 +71,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
v1.setEnodeAddr(v2.n);
return v1;
}
......@@ -87,6 +90,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
v2.n = v1.getEnodeAddr();
return v2;
}
......@@ -197,4 +201,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
public void setM(boolean m) {
this.m = m;
}
public String getN() {
return n;
}
public void setN(String n) {
this.n = n;
}
}
\ No newline at end of file
package org.apache.rocketmq.common.protocol.header.namesrv;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class RegisterSnodeRequestHeader implements CommandCustomHeader {
@CFNotNull
private String snodeName;
@CFNotNull
private String snodeAddr;
@CFNotNull
private String clusterName;
@Override
public void checkFields() throws RemotingCommandException {
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public String getSnodeName() {
return snodeName;
}
public void setSnodeName(String snodeName) {
this.snodeName = snodeName;
}
public String getSnodeAddr() {
return snodeAddr;
}
public void setSnodeAddr(String snodeAddr) {
this.snodeAddr = snodeAddr;
}
}
package org.apache.rocketmq.common.protocol.header.namesrv;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class RegisterSnodeResponseHeader {
}
......@@ -28,6 +28,15 @@ public class HeartbeatData extends RemotingSerializable {
private String clientID;
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
private SnodeData snodeData;
public SnodeData getSnodeData() {
return snodeData;
}
public void setSnodeData(SnodeData snodeData) {
this.snodeData = snodeData;
}
public String getClientID() {
return clientID;
......@@ -53,9 +62,12 @@ public class HeartbeatData extends RemotingSerializable {
this.consumerDataSet = consumerDataSet;
}
@Override
public String toString() {
return "HeartbeatData [clientID=" + clientID + ", producerDataSet=" + producerDataSet
+ ", consumerDataSet=" + consumerDataSet + "]";
@Override public String toString() {
return "HeartbeatData{" +
"clientID='" + clientID + '\'' +
", producerDataSet=" + producerDataSet +
", consumerDataSet=" + consumerDataSet +
", snodeData='" + snodeData + '\'' +
'}';
}
}
package org.apache.rocketmq.common.protocol.heartbeat;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class SnodeData {
private String snodeName;
public String getSnodeName() {
return snodeName;
}
public void setSnodeName(String snodeName) {
this.snodeName = snodeName;
}
@Override public String toString() {
return "SnodeData{" +
"snodeName='" + snodeName + '\'' +
'}';
}
}
package org.apache.rocketmq.common.protocol.route;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class SnodeData {
private String snodeName;
private String addr;
private String clusterName;
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public String getSnodeName() {
return snodeName;
}
public void setSnodeName(String snodeName) {
this.snodeName = snodeName;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
@Override
public String toString() {
return "SnodeData{" +
"snodeName='" + snodeName + '\'' +
", addr='" + addr + '\'' +
", clusterName='" + clusterName + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.utils;
import java.util.concurrent.atomic.AtomicInteger;
public class PositiveAtomicCounter {
private static final int MASK = 0x7FFFFFFF;
private final AtomicInteger atom;
public PositiveAtomicCounter() {
atom = new AtomicInteger(0);
}
public final int incrementAndGet() {
final int rt = atom.incrementAndGet();
return rt & MASK;
}
public int intValue() {
return atom.intValue();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="DefaultAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/snode_default.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/snode_default.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="RocketmqSnodeAppender_inner"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/snode.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/snode.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="RocketmqSnodeAppender" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="RocketmqSnodeAppender_inner"/>
<discardingThreshold>0</discardingThreshold>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<append>true</append>
<encoder>
<pattern>%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<logger name="RocketmqSnode" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqSnodeAppender"/>
<appender-ref ref="STDOUT"/>
</logger>
<logger name="RocketmqCommon" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqSnodeAppender"/>
</logger>
<logger name="RocketmqRemoting" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqSnodeAppender"/>
<appender-ref ref="STDOUT"/>
</logger>
<root>
<level value="debug"/>
<appender-ref ref="STDOUT"/>
<appender-ref ref="DefaultAppender"/>
</root>
</configuration>
......@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
......@@ -50,7 +51,7 @@ public class Producer {
*/
producer.start();
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < 10; i++) {
try {
/*
......@@ -76,6 +77,7 @@ public class Producer {
/*
* Shut down once the producer instance is not longer in use.
*/
Thread.sleep(100000000000L);
producer.shutdown();
}
}
......@@ -43,7 +43,6 @@ public class Producer {
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
......@@ -27,6 +27,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
......@@ -77,7 +78,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
......@@ -122,12 +122,26 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
case RequestCode.REGISTER_SNODE:
return this.registerSnode(ctx, request);
default:
break;
}
return null;
}
public RemotingCommand registerSnode(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final RegisterSnodeRequestHeader requestHeader =
(RegisterSnodeRequestHeader) request.decodeCommandCustomHeader(RegisterSnodeRequestHeader.class);
this.namesrvController.getRouteInfoManager().registerSnode(
requestHeader.getClusterName(),
requestHeader.getSnodeName(),
requestHeader.getSnodeAddr());
return response;
}
@Override
public boolean rejectRequest() {
return false;
......@@ -280,7 +294,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
log.info("requestHeader: " + requestHeader );
log.info("requestHeader: " + requestHeader);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
......
......@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
......@@ -54,6 +55,8 @@ public class RouteInfoManager {
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
private final HashMap<String/* snodeName*/, SnodeData> snodeTable;
private final HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
......@@ -61,6 +64,8 @@ public class RouteInfoManager {
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
this.snodeTable = new HashMap<>(256);
this.snodeCluster = new HashMap<>(256);
}
public byte[] getAllClusterInfo() {
......@@ -99,6 +104,32 @@ public class RouteInfoManager {
return topicList.encode();
}
public void registerSnode(
final String clusterName,
final String snodeName,
final String snodeAddr) {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> snodeSet = this.snodeCluster.get(clusterName);
if (snodeSet == null) {
snodeSet = new HashSet<>();
snodeSet.add(snodeName);
this.snodeCluster.put(clusterName, snodeSet);
} else {
snodeSet.add(snodeName);
}
SnodeData snodeData = new SnodeData();
snodeData.setAddr(snodeAddr);
snodeData.setSnodeName(snodeName);
snodeData.setClusterName(clusterName);
snodeTable.put(snodeName, snodeData);
} catch (Exception ex) {
log.error("Register snode error", ex);
} finally {
this.lock.writeLock().unlock();
}
}
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
......
......@@ -125,6 +125,7 @@
<module>distribution</module>
<module>openmessaging</module>
<module>logging</module>
<module>rocketmq-snode</module>
</modules>
<build>
......
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>rocketmq-snode</artifactId>
<name>rocketmq-snode ${project.version}</name>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-store</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.rocketmq</groupId>-->
<!--<artifactId>rocketmq-broker</artifactId>-->
<!--</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
</plugin>
</plugins>
</build>
</project>
package org.apache.rocketmq.snode;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ConsumerManager;
import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ProducerManager;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HearbeatProcessor;
import org.apache.rocketmq.snode.processor.PullMessageProcessor;
import org.apache.rocketmq.snode.processor.SendMessageProcessor;
import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.SnodeOuterService;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
import org.apache.rocketmq.snode.service.impl.SnodeOuterServiceImpl;
public class SnodeController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeConfig snodeConfig;
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
private RemotingServer snodeServer;
private ExecutorService sendMessageExcutor;
private ExecutorService heartbeatExecutor;
private ExecutorService pullMessageExcutor;
private SnodeOuterService snodeOuterService;
private ExecutorService consumerManagerExcutor;
private ScheduledService scheduledService;
private ProducerManager producerManager;
private ConsumerManager consumerManager;
private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
public SnodeController(NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig,
SnodeConfig snodeConfig) {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig;
this.snodeOuterService = SnodeOuterServiceImpl.getInstance(this);
this.scheduledService = new ScheduledServiceImpl(this.snodeOuterService, this.snodeConfig);
this.sendMessageExcutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodeSendMessageThread",
false);
this.pullMessageExcutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeHeartBeatCorePoolSize(),
snodeConfig.getSnodeHeartBeatMaxPoolSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
"SnodeHeartbeatThread",
true);
this.consumerManagerExcutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
"SnodePullMessageThread",
false);
if (this.snodeConfig.getNamesrvAddr() != null) {
this.snodeOuterService.updateNameServerAddressList(this.snodeConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.snodeConfig.getNamesrvAddr());
}
this.producerManager = new ProducerManager();
ConsumerIdsChangeListener consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(consumerIdsChangeListener);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
}
public SnodeConfig getSnodeConfig() {
return snodeConfig;
}
public boolean initialize() {
this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, null);
this.registerProcessor();
return true;
}
public void registerProcessor() {
snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this.snodeOuterService), sendMessageExcutor);
snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this.snodeOuterService), pullMessageExcutor);
snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
}
public void start() {
initialize();
this.snodeServer.start();
this.snodeOuterService.start();
this.scheduledService.startScheduleTask();
this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
}
public void shutdown() {
this.sendMessageExcutor.shutdown();
this.pullMessageExcutor.shutdown();
this.heartbeatExecutor.shutdown();
this.scheduledExecutorService.shutdown();
this.snodeOuterService.shutdown();
this.scheduledService.shutdown();
this.clientHousekeepingService.shutdown();
}
public ProducerManager getProducerManager() {
return producerManager;
}
public void setProducerManager(ProducerManager producerManager) {
this.producerManager = producerManager;
}
public RemotingServer getSnodeServer() {
return snodeServer;
}
public void setSnodeServer(RemotingServer snodeServer) {
this.snodeServer = snodeServer;
}
public ConsumerManager getConsumerManager() {
return consumerManager;
}
public void setConsumerManager(ConsumerManager consumerManager) {
this.consumerManager = consumerManager;
}
public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}
public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
this.subscriptionGroupManager = subscriptionGroupManager;
}
public NettyClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
public SnodeOuterService getSnodeOuterService() {
return snodeOuterService;
}
public void setSnodeOuterService(SnodeOuterService snodeOuterService) {
this.snodeOuterService = snodeOuterService;
}
}
package org.apache.rocketmq.snode;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class SnodeStartup {
private static InternalLogger log;
public static Properties properties = null;
public static CommandLine commandLine = null;
public static String configFile = null;
public static void main(String[] args) throws IOException, JoranException {
startup(createSnodeController(args));
}
public static SnodeController startup(SnodeController controller) {
try {
controller.start();
String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", "
+ controller.getSnodeConfig().getSnodeAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getSnodeConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getSnodeConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public static SnodeController createSnodeController(String[] args) throws IOException, JoranException {
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("snode", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
final SnodeConfig snodeConfig = new SnodeConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(snodeConfig.getListenPort());
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, snodeConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
in.close();
}
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(snodeConfig.getRocketmqHome() + "/conf/logback_snode.xml");
log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
MixAll.printObjectProperties(log, snodeConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final SnodeController snodeController = new SnodeController(
nettyServerConfig,
nettyClientConfig,
snodeConfig);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
this.hasShutdown = true;
snodeController.shutdown();
}
}
}
}));
return snodeController;
}
private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
return options;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class ClientChannelInfo {
private final Channel channel;
private final String clientId;
private final LanguageCode language;
private final int version;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public ClientChannelInfo(Channel channel) {
this(channel, null, null, 0);
}
public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
this.channel = channel;
this.clientId = clientId;
this.language = language;
this.version = version;
}
public Channel getChannel() {
return channel;
}
public String getClientId() {
return clientId;
}
public LanguageCode getLanguage() {
return language;
}
public int getVersion() {
return version;
}
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((channel == null) ? 0 : channel.hashCode());
result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
result = prime * result + ((language == null) ? 0 : language.hashCode());
result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
result = prime * result + version;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ClientChannelInfo other = (ClientChannelInfo) obj;
if (channel == null) {
if (other.channel != null)
return false;
} else if (this.channel != other.channel) {
return false;
}
return true;
}
@Override
public String toString() {
return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
+ ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ProducerManager producerManager;
private final ConsumerManager consumerManager;
private ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
public ClientHousekeepingService(final ProducerManager producerManager, final ConsumerManager consumerManager) {
this.producerManager = producerManager;
this.consumerManager = consumerManager;
}
public void start(long interval) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ClientHousekeepingService.this.scanExceptionChannel();
} catch (Throwable e) {
log.error("Error occurred when scan not active client channels.", e);
}
}
}, 1000 * 10, interval, TimeUnit.MILLISECONDS);
}
private void scanExceptionChannel() {
this.producerManager.scanNotActiveChannel();
//this.consumerManager.scanNotActiveChannel();
}
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.producerManager.doChannelCloseEvent(remoteAddr, channel);
this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
public enum ConsumerGroupEvent {
/**
* Some consumers in the group are changed.
*/
CHANGE,
/**
* The group of consumer is unregistered.
*/
UNREGISTER,
/**
* The group of consumer is registered.
*/
REGISTER
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class ConsumerGroupInfo {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
private volatile ConsumeType consumeType;
private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
ConsumeFromWhere consumeFromWhere) {
this.groupName = groupName;
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
}
public ClientChannelInfo findChannel(final String clientId) {
Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> next = it.next();
if (next.getValue().getClientId().equals(clientId)) {
return next.getValue();
}
}
return null;
}
public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() {
return subscriptionTable;
}
public ConcurrentMap<Channel, ClientChannelInfo> getChannelInfoTable() {
return channelInfoTable;
}
public List<Channel> getAllChannel() {
List<Channel> result = new ArrayList<>();
result.addAll(this.channelInfoTable.keySet());
return result;
}
public List<String> getAllClientId() {
List<String> result = new ArrayList<>();
Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> entry = it.next();
ClientChannelInfo clientChannelInfo = entry.getValue();
result.add(clientChannelInfo.getClientId());
}
return result;
}
public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
if (old != null) {
log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
}
}
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
final ClientChannelInfo info = this.channelInfoTable.remove(channel);
if (info != null) {
log.warn(
"NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
info.toString(), groupName);
return true;
}
return false;
}
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false;
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) {
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) {
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
updated = true;
}
infoOld = infoNew;
} else {
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
this.groupName,
infoOld.toString(),
infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
for (SubscriptionData sub : subList) {
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
next.getValue().toString()
);
it.remove();
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
public Set<String> getSubscribeTopics() {
return subscriptionTable.keySet();
}
public SubscriptionData findSubscriptionData(final String topic) {
return this.subscriptionTable.get(topic);
}
public ConsumeType getConsumeType() {
return consumeType;
}
public void setConsumeType(ConsumeType consumeType) {
this.consumeType = consumeType;
}
public MessageModel getMessageModel() {
return messageModel;
}
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
public String getGroupName() {
return groupName;
}
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
public ConsumeFromWhere getConsumeFromWhere() {
return consumeFromWhere;
}
public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
public interface ConsumerIdsChangeListener {
void handle(ConsumerGroupEvent event, String group, Object... args);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ConsumerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
private final ConsumerIdsChangeListener consumerIdsChangeListener;
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
this.consumerIdsChangeListener = consumerIdsChangeListener;
}
public ClientChannelInfo findChannel(final String group, final String clientId) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (consumerGroupInfo != null) {
return consumerGroupInfo.findChannel(clientId);
}
return null;
}
public SubscriptionData findSubscriptionData(final String group, final String topic) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
return consumerGroupInfo.findSubscriptionData(topic);
}
return null;
}
public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
return this.consumerTable.get(group);
}
public int findSubscriptionDataCount(final String group) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
return consumerGroupInfo.getSubscriptionTable().size();
}
return 0;
}
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
ConsumerGroupInfo info = next.getValue();
boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
if (removed) {
if (info.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
next.getKey());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
}
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null != consumerGroupInfo) {
consumerGroupInfo.unregisterChannel(clientChannelInfo);
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
public void scanNotActiveChannel() {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
String group = next.getKey();
ConsumerGroupInfo consumerGroupInfo = next.getValue();
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
while (itChannel.hasNext()) {
Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
ClientChannelInfo clientChannelInfo = nextChannel.getValue();
long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
}
}
if (channelInfoTable.isEmpty()) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
group);
it.remove();
}
}
}
public HashSet<String> queryTopicConsumeByWho(final String topic) {
HashSet<String> groups = new HashSet<>();
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> entry = it.next();
ConcurrentMap<String, SubscriptionData> subscriptionTable =
entry.getValue().getSubscriptionTable();
if (subscriptionTable.containsKey(topic)) {
groups.add(entry.getKey());
}
}
return groups;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.snode.SnodeController;
//TODO Filter implementation
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
private final SnodeController snodeController;
public DefaultConsumerIdsChangeListener(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public void handle(ConsumerGroupEvent event, String group, Object... args) {
if (event == null) {
return;
}
switch (event) {
case CHANGE:
if (args == null || args.length < 1) {
return;
}
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && snodeController.getSnodeConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
this.snodeController.getSnodeOuterService().notifyConsumerIdsChanged(chl, group);
}
}
break;
case UNREGISTER:
// this.snodeController.getConsumerFilterManager().unRegister(group);
break;
case REGISTER:
if (args == null || args.length < 1) {
return;
}
// Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
// this.snodeController.getConsumerFilterManager().register(group, subscriptionDataList);
break;
default:
throw new RuntimeException("Unknown event " + event);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.PositiveAtomicCounter;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
public ProducerManager() {
}
public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
newGroupChannelTable.putAll(groupChannelTable);
} finally {
groupChannelLock.unlock();
}
}
} catch (InterruptedException e) {
log.error("", e);
}
return newGroupChannelTable;
}
public void scanNotActiveChannel() {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
for (final Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
final String group = entry.getKey();
final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Channel, ClientChannelInfo> item = it.next();
// final Integer id = item.getKey();
final ClientChannelInfo info = item.getValue();
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
RemotingUtil.closeChannel(info.getChannel());
}
}
}
} finally {
this.groupChannelLock.unlock();
}
} else {
log.warn("ProducerManager scanNotActiveChannel lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
}
public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
if (channel != null) {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
for (final Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
final String group = entry.getKey();
final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
entry.getValue();
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
}
}
} finally {
this.groupChannelLock.unlock();
}
} else {
log.warn("ProducerManager doChannelCloseEvent lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
}
}
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
ClientChannelInfo clientChannelInfoFound = null;
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new HashMap<>();
this.groupChannelTable.put(group, channelTable);
}
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
} finally {
this.groupChannelLock.unlock();
}
if (clientChannelInfoFound != null) {
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
}
} else {
log.warn("ProducerManager registerProducer lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
}
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
}
if (channelTable.isEmpty()) {
this.groupChannelTable.remove(group);
log.info("unregister a producer group[{}] from groupChannelTable", group);
}
}
} finally {
this.groupChannelLock.unlock();
}
} else {
log.warn("ProducerManager unregisterProducer lock timeout");
}
} catch (InterruptedException e) {
log.error("", e);
}
}
public Channel getAvaliableChannel(String groupId) {
HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
List<Channel> channelList = new ArrayList<Channel>();
if (channelClientChannelInfoHashMap != null) {
for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
channelList.add(channel);
}
int size = channelList.size();
if (0 == size) {
log.warn("Channel list is empty. groupId={}", groupId);
return null;
}
int index = positiveAtomicCounter.incrementAndGet() % size;
Channel channel = channelList.get(index);
int count = 0;
boolean isOk = channel.isActive() && channel.isWritable();
while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
if (isOk) {
return channel;
}
index = (++index) % size;
channel = channelList.get(index);
isOk = channel.isActive() && channel.isWritable();
}
} else {
log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
return null;
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.client;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.snode.SnodeController;
public class SubscriptionGroupManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
private final DataVersion dataVersion = new DataVersion();
private transient SnodeController snodeController;
public enum SUBSCRIPTION_EVENT {
CREATE,
UPDATE,
DELETE
}
public SubscriptionGroupManager() {
this.init();
}
public SubscriptionGroupManager(SnodeController snodeController) {
this.snodeController = snodeController;
this.init();
}
private void init() {
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
}
{
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
}
}
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
log.info("update subscription group config, old: {} new: {}", old, config);
} else {
log.info("create new subscription group, {}", config);
}
this.dataVersion.nextVersion();
this.persistToEnode(SUBSCRIPTION_EVENT.UPDATE, config);
}
public void disableConsume(final String groupName) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
if (old != null) {
old.setConsumeEnable(false);
this.dataVersion.nextVersion();
}
}
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) {
if (snodeController.getSnodeConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setGroupName(group);
SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
if (null == preConfig) {
log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
}
this.dataVersion.nextVersion();
this.persistToEnode(SUBSCRIPTION_EVENT.CREATE, subscriptionGroupConfig);
}
}
return subscriptionGroupConfig;
}
@Override
public String encode() {
return this.encode(false);
}
@Override
public String configFilePath() {
//TODO get subscription persist request code
return null;
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
if (obj != null) {
this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
this.dataVersion.assignNewOne(obj.dataVersion);
this.printLoadDataWhenFirstBoot(obj);
}
}
}
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}
private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionGroupConfig> next = it.next();
log.info("load exist subscription group, {}", next.getValue().toString());
}
}
public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
return subscriptionGroupTable;
}
public DataVersion getDataVersion() {
return dataVersion;
}
public void deleteSubscriptionGroupConfig(final String groupName) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
if (old != null) {
log.info("delete subscription group OK, subscription group:{}", old);
this.dataVersion.nextVersion();
this.persistToEnode(SUBSCRIPTION_EVENT.DELETE, old);
} else {
log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
}
}
void persistToEnode(SUBSCRIPTION_EVENT event, SubscriptionGroupConfig config) {
}
}
package org.apache.rocketmq.snode.config;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class SnodeConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
private String snodeName = "defaultNode";
private String snodeAddr = "127.0.0.1:11911";
private String clusterName = "defaultCluster";
private int snodeSendThreadPoolQueueCapacity = 10000;
private int snodeSendMessageMinPoolSize = 10;
private int snodeSendMessageMaxPoolSize = 20;
private int snodeHeartBeatCorePoolSize = 1;
private int snodeHeartBeatMaxPoolSize = 2;
private int snodeHeartBeatThreadPoolQueueCapacity = 1000;
private long snodeHeartBeatInterval = 30 * 1000;
private boolean fetechNameserver = false;
private long houseKeepingInterval = 10 * 1000;
private boolean notifyConsumerIdsChangedEnable = true;
private boolean autoCreateSubscriptionGroup = true;
private int listenPort = 11911;
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
this.snodeHeartBeatInterval = snodeHeartBeatInterval;
}
public long getHouseKeepingInterval() {
return houseKeepingInterval;
}
public void setHouseKeepingInterval(long houseKeepingInterval) {
this.houseKeepingInterval = houseKeepingInterval;
}
public boolean isFetechNameserver() {
return fetechNameserver;
}
public void setFetechNameserver(boolean fetechNameserver) {
this.fetechNameserver = fetechNameserver;
}
public long getSnodeHeartBeatInterval() {
return snodeHeartBeatInterval;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
/**
* This configurable item defines interval of topics registration of broker to name server. Allowing values are
* between 10, 000 and 60, 000 milliseconds.
*/
private int registerNameServerPeriod = 1000 * 30;
public int getRegisterNameServerPeriod() {
return registerNameServerPeriod;
}
public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
this.registerNameServerPeriod = registerNameServerPeriod;
}
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
public boolean isFetchNamesrvAddrByAddressServer() {
return fetchNamesrvAddrByAddressServer;
}
public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
}
public int getSnodeHeartBeatThreadPoolQueueCapacity() {
return snodeHeartBeatThreadPoolQueueCapacity;
}
public void setSnodeHeartBeatThreadPoolQueueCapacity(int snodeHeartBeatThreadPoolQueueCapacity) {
this.snodeHeartBeatThreadPoolQueueCapacity = snodeHeartBeatThreadPoolQueueCapacity;
}
public int getSnodeHeartBeatCorePoolSize() {
return snodeHeartBeatCorePoolSize;
}
public void setSnodeHeartBeatCorePoolSize(int snodeHeartBeatCorePoolSize) {
this.snodeHeartBeatCorePoolSize = snodeHeartBeatCorePoolSize;
}
public int getSnodeHeartBeatMaxPoolSize() {
return snodeHeartBeatMaxPoolSize;
}
public void setSnodeHeartBeatMaxPoolSize(int snodeHeartBeatMaxPoolSize) {
this.snodeHeartBeatMaxPoolSize = snodeHeartBeatMaxPoolSize;
}
public int getListenPort() {
return listenPort;
}
public String getRocketmqHome() {
return rocketmqHome;
}
public void setRocketmqHome(String rocketmqHome) {
this.rocketmqHome = rocketmqHome;
}
public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}
public int getSnodeSendThreadPoolQueueCapacity() {
return snodeSendThreadPoolQueueCapacity;
}
public void setSnodeSendThreadPoolQueueCapacity(int snodeSendThreadPoolQueueCapacity) {
this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity;
}
public int getSnodeSendMessageMinPoolSize() {
return snodeSendMessageMinPoolSize;
}
public void setSnodeSendMessageMinPoolSize(int snodeSendMessageMinPoolSize) {
this.snodeSendMessageMinPoolSize = snodeSendMessageMinPoolSize;
}
public int getSnodeSendMessageMaxPoolSize() {
return snodeSendMessageMaxPoolSize;
}
public void setSnodeSendMessageMaxPoolSize(int snodeSendMessageMaxPoolSize) {
this.snodeSendMessageMaxPoolSize = snodeSendMessageMaxPoolSize;
}
public String getSnodeAddr() {
return snodeAddr;
}
public void setSnodeAddr(String snodeAddr) {
this.snodeAddr = snodeAddr;
}
public String getSnodeName() {
return snodeName;
}
public void setSnodeName(String snodeName) {
this.snodeName = snodeName;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public boolean isNotifyConsumerIdsChangedEnable() {
return notifyConsumerIdsChangedEnable;
}
public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
}
public boolean isAutoCreateSubscriptionGroup() {
return autoCreateSubscriptionGroup;
}
public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
public class ConsumerManageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final SnodeController snodeController;
public ConsumerManageProcessor(final SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
default:
break;
}
return null;
}
@Override
public boolean rejectRequest() {
return false;
}
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
final GetConsumerListByGroupRequestHeader requestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo =
this.snodeController.getConsumerManager().getConsumerGroupInfo(
requestHeader.getConsumerGroup());
if (consumerGroupInfo != null) {
List<String> clientIds = consumerGroupInfo.getAllClientId();
if (!clientIds.isEmpty()) {
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
} else {
log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
} else {
log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
return response;
}
}
package org.apache.rocketmq.snode.processor;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ClientChannelInfo;
public class HearbeatProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
public HearbeatProcessor(SnodeController snodeController) {
this.snodeController = snodeController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
if (heartbeatData.getProducerDataSet() != null) {
for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
this.snodeController.getProducerManager().registerProducer(producerData.getGroupName(),
clientChannelInfo);
}
}
if (heartbeatData.getConsumerDataSet() != null) {
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
boolean changed = this.snodeController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
}
RemotingCommand response = RemotingCommand.createResponseCommand(null);
return response;
}
@Override
public boolean rejectRequest() {
return false;
}
}
package org.apache.rocketmq.snode.processor;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.SnodeOuterService;
public class PullMessageProcessor implements NettyRequestProcessor {
private final SnodeOuterService snodeOuterService;
public PullMessageProcessor(SnodeOuterService snodeOuterService){
this.snodeOuterService = snodeOuterService;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
return snodeOuterService.pullMessage(request);
}
@Override
public boolean rejectRequest() {
return false;
}
}
package org.apache.rocketmq.snode.processor;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.SnodeOuterService;
public class SendMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeOuterService snodeOuterService;
public SendMessageProcessor(final SnodeOuterService snodeOuterService) {
this.snodeOuterService = snodeOuterService;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
return snodeOuterService.sendMessage(request);
}
@Override
public boolean rejectRequest() {
return false;
}
}
package org.apache.rocketmq.snode.service;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public interface ScheduledService {
void startScheduleTask();
void shutdown();
}
package org.apache.rocketmq.snode.service;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface SendTransferService {
RemotingCommand sendMessage(RemotingCommand request);
boolean start();
void shutdown();
}
package org.apache.rocketmq.snode.service;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.netty.channel.Channel;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.config.SnodeConfig;
public interface SnodeOuterService {
void sendHearbeat(RemotingCommand remotingCommand);
RemotingCommand sendMessage(RemotingCommand remotingCommand);
RemotingCommand pullMessage(RemotingCommand remotingCommand);
void saveSubscriptionData(RemotingCommand remotingCommand);
void start();
void shutdown();
void registerSnode(SnodeConfig snodeConfig);
void updateNameServerAddressList(final String addrs);
String fetchNameServerAddr();
void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException;
void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup);
RemotingCommand creatTopic(TopicConfig topicConfig);
}
package org.apache.rocketmq.snode.service.impl;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.SnodeData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.SnodeOuterService;
public class ScheduledServiceImpl implements ScheduledService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private SnodeOuterService snodeOuterService;
private SnodeConfig snodeConfig;
private final RemotingCommand enodeHeartbeat;
public ScheduledServiceImpl(SnodeOuterService snodeOuterService, SnodeConfig snodeConfig) {
this.snodeOuterService = snodeOuterService;
this.snodeConfig = snodeConfig;
enodeHeartbeat = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
HeartbeatData heartbeatData = new HeartbeatData();
heartbeatData.setClientID(snodeConfig.getSnodeName());
SnodeData snodeData = new SnodeData();
snodeData.setSnodeName(snodeConfig.getSnodeName());
heartbeatData.setSnodeData(snodeData);
enodeHeartbeat.setBody(heartbeatData.encode());
}
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "SNodeScheduledThread");
}
});
@Override
public void startScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
snodeOuterService.sendHearbeat(enodeHeartbeat);
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 1000 * 10, this.snodeConfig.getSnodeHeartBeatInterval(), TimeUnit.MILLISECONDS);
if (snodeConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
snodeOuterService.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
snodeOuterService.registerSnode(snodeConfig);
}
}, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
snodeOuterService.updateEnodeAddr(snodeConfig.getClusterName());
} catch (Exception ex) {
log.warn("Update broker addr error:{}", ex);
}
}
}, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {
if (this.scheduledExecutorService != null) {
this.scheduledExecutorService.shutdown();
}
}
}
package org.apache.rocketmq.snode.service.impl;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.SendTransferService;
import org.apache.rocketmq.snode.service.SnodeOuterService;
public class SendTransferServiceImpl implements SendTransferService {
private ServiceState serviceState = ServiceState.CREATE_JUST;
private SnodeOuterService snodeOuterService;
public SendTransferServiceImpl(SnodeOuterService snodeOuterService) {
snodeOuterService = snodeOuterService;
}
@Override
public RemotingCommand sendMessage(RemotingCommand request) {
return snodeOuterService.sendMessage(request);
}
@Override
public boolean start() {
return false;
}
@Override
public void shutdown() {
}
}
package org.apache.rocketmq.snode.service.impl;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.SnodeOuterService;
public class SnodeOuterServiceImpl implements SnodeOuterService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private RemotingClient client;
private SnodeController snodeController;
private static SnodeOuterServiceImpl snodeOuterService;
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
new ConcurrentHashMap<>();
private final long defaultTimeoutMills = 3000L;
private SnodeOuterServiceImpl() {
}
public static SnodeOuterServiceImpl getInstance(SnodeController snodeController) {
if (snodeOuterService == null) {
synchronized (SnodeOuterServiceImpl.class) {
if (snodeOuterService == null) {
snodeOuterService = new SnodeOuterServiceImpl(snodeController);
return snodeOuterService;
}
}
}
return snodeOuterService;
}
private SnodeOuterServiceImpl(SnodeController snodeController) {
this.snodeController = snodeController;
this.client = RemotingClientFactory.createInstance().init(snodeController.getNettyClientConfig(), null);
}
@Override
public void start() {
this.client.start();
}
@Override
public void shutdown() {
this.client.shutdown();
}
@Override
public void sendHearbeat(RemotingCommand remotingCommand) {
for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
if (enodeAddr != null) {
try {
RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, defaultTimeoutMills);
} catch (Exception ex) {
log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
}
}
}
}
@Override
public RemotingCommand sendMessage(RemotingCommand request) {
try {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
RemotingCommand response =
this.client.invokeSync(sendMessageRequestHeaderV2.getN(), request, defaultTimeoutMills);
return response;
} catch (Exception ex) {
log.error("Send message async error:", ex);
}
return null;
}
@Override
public RemotingCommand pullMessage(RemotingCommand request) {
try {
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
RemotingCommand remotingCommand = this.client.invokeSync(requestHeader.getEnodeAddr(), request, 20 * defaultTimeoutMills);
log.info("Pull message response:{}", remotingCommand);
log.info("Pull message response:{}", remotingCommand.getBody().length);
return remotingCommand;
} catch (Exception ex) {
log.error("pull message async error:", ex);
}
return null;
}
@Override
public void saveSubscriptionData(RemotingCommand remotingCommand) {
}
@Override
public String fetchNameServerAddr() {
try {
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
private ClusterInfo getBrokerClusterInfo(
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
RemotingCommand response = this.client.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
@Override
public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
synchronized (this) {
ClusterInfo clusterInfo = getBrokerClusterInfo(defaultTimeoutMills);
if (clusterInfo != null) {
HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
Set<String> brokerNames = entry.getValue();
if (brokerNames != null) {
for (String brokerName : brokerNames) {
enodeTable.put(brokerName, clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs());
}
}
}
}
}
}
public void updateNameServerAddressList(final String addrs) {
List<String> list = new ArrayList<String>();
String[] addrArray = addrs.split(";");
for (String addr : addrArray) {
list.add(addr);
}
this.client.updateNameServerAddressList(list);
}
public void registerSnode(SnodeConfig snodeConfig) {
List<String> nameServerAddressList = this.client.getNameServerAddressList();
RemotingCommand remotingCommand = new RemotingCommand();
RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
requestHeader.setSnodeName(snodeConfig.getSnodeName());
requestHeader.setClusterName(snodeConfig.getClusterName());
remotingCommand.setCustomHeader(requestHeader);
remotingCommand.setCode(RequestCode.REGISTER_SNODE);
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
for (String nameServer : nameServerAddressList) {
try {
this.client.invokeSync(nameSrvAddr, remotingCommand, 3000L);
} catch (Exception ex) {
log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
}
}
}
}
@Override
public void notifyConsumerIdsChanged(
final Channel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
return;
}
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
this.snodeController.getSnodeServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
}
}
@Override
public RemotingCommand creatTopic(TopicConfig topicConfig) {
// CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
// requestHeader.setTopic(topicConfig.getTopicName());
// requestHeader.setDefaultTopic(defaultTopic);
// requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
// requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
// requestHeader.setPerm(topicConfig.getPerm());
// requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
// requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
// requestHeader.setOrder(topicConfig.isOrder());
//
// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
//
// RemotingCommand response = this.client.invokeSync(,
// request, defaultTimeoutMills);
// assert response != null;
// switch (response.getCode()) {
// case ResponseCode.SUCCESS: {
// return;
// }
// default:
// break;
// }
return null;
}
}
package org.apache.rocketmq.snode.topic;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class TopicConfigManager {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册