提交 0f0b5653 编写于 作者: D duhenglucky

Merge storage-with-deleger branch

......@@ -10,4 +10,6 @@ devenv
*.versionsBackup
!NOTICE-BIN
!LICENSE-BIN
.DS_Store
\ No newline at end of file
.DS_Store
localbin
nohup.out
Apache RocketMQ
Copyright 2016-2017 The Apache Software Foundation
Copyright 2016-2018 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
......@@ -21,6 +21,8 @@ It offers a variety of features:
* Various message filter mechanics such as SQL and Tag
* Docker images for isolated testing and cloud isolated clusters
* Feature-rich administrative dashboard for configuration, metrics and monitoring
* Access control list
* Message trace
----------
......@@ -37,8 +39,7 @@ It offers a variety of features:
----------
## Apache RocketMQ Community
* [RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)
* [RocketMQ Community Projects](https://github.com/apache/rocketmq-externals)
----------
## Contributing
......@@ -47,3 +48,5 @@ We always welcome new contributions, whether for trivial cleanups, big new featu
----------
## License
[Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
......@@ -9,14 +9,13 @@
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
......
......@@ -59,7 +59,6 @@ public class PlainAccessValidatorTest {
messageRequestHeader.setTopic("topicA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encode(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
......@@ -77,7 +76,6 @@ public class PlainAccessValidatorTest {
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encode(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
......@@ -93,7 +91,6 @@ public class PlainAccessValidatorTest {
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
......@@ -108,7 +105,6 @@ public class PlainAccessValidatorTest {
messageRequestHeader.setTopic("topicC");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
......@@ -201,6 +197,7 @@ public class PlainAccessValidatorTest {
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(CodecHelper.decode(buf), "192.168.0.1:9876");
plainAccessValidator.validate(accessResource);
}
......@@ -242,7 +239,6 @@ public class PlainAccessValidatorTest {
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
......@@ -261,7 +257,6 @@ public class PlainAccessValidatorTest {
messageRequestHeader.setTopic("topicB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClientRPCHook.doBeforeRequest("", remotingCommand);
ByteBuffer buf = CodecHelper.encodeHeader(remotingCommand);
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
......
......@@ -116,7 +116,7 @@ public class PlainPermissionLoaderTest {
Assert.assertEquals(resourcePermMap.size(), 3);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupA")).byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupB")).byteValue(), Permission.PUB | Permission.SUB);
Assert.assertEquals(resourcePermMap.get(PlainAccessResource.getRetryTopic("groupC")).byteValue(), Permission.PUB);
List<String> topics = new ArrayList<String>();
......@@ -129,7 +129,7 @@ public class PlainPermissionLoaderTest {
Assert.assertEquals(resourcePermMap.size(), 6);
Assert.assertEquals(resourcePermMap.get("topicA").byteValue(), Permission.DENY);
Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.PUB|Permission.SUB);
Assert.assertEquals(resourcePermMap.get("topicB").byteValue(), Permission.PUB | Permission.SUB);
Assert.assertEquals(resourcePermMap.get("topicC").byteValue(), Permission.PUB);
}
......@@ -156,6 +156,7 @@ public class PlainPermissionLoaderTest {
plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
}
@Test(expected = AclException.class)
public void checkErrorPerm() {
......@@ -163,6 +164,7 @@ public class PlainPermissionLoaderTest {
plainAccessResource.addResourceAndPerm("topicF", Permission.SUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
}
@Test(expected = AclException.class)
public void accountNullTest() {
plainAccessConfig.setAccessKey(null);
......@@ -212,12 +214,11 @@ public class PlainPermissionLoaderTest {
Assert.assertTrue(plainPermissionLoader.isWatchStart());
}
@Test
public void testWatch() throws IOException, IllegalAccessException ,InterruptedException{
public void testWatch() throws IOException, IllegalAccessException, InterruptedException {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl-test.yml");
String fileName =System.getProperty("rocketmq.home.dir", "src/test/resources")+System.getProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
String fileName = System.getProperty("rocketmq.home.dir", "src/test/resources") + System.getProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
File transport = new File(fileName);
transport.delete();
transport.createNewFile();
......
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -52,6 +46,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-filter</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
......
......@@ -27,10 +27,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
......@@ -38,6 +40,7 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
......@@ -99,6 +102,7 @@ import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
......@@ -159,6 +163,7 @@ public class BrokerController {
private TransactionalMessageCheckService transactionalMessageCheckService;
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
public BrokerController(
final BrokerConfig brokerConfig,
......@@ -234,6 +239,10 @@ public class BrokerController {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
......@@ -249,10 +258,8 @@ public class BrokerController {
if (result) {
this.remotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
ServerConfig fastConfig = (ServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
// this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.fastRemotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
......@@ -399,37 +406,26 @@ public class BrokerController {
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
......@@ -473,6 +469,8 @@ public class BrokerController {
}
}
initialTransaction();
initialAcl();
// initialRpcHooks();
}
return result;
}
......@@ -492,6 +490,57 @@ public class BrokerController {
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
private void initialAcl() {
if (!this.brokerConfig.isAclEnable()) {
log.info("The broker dose not enable acl");
return;
}
List<AccessValidator> accessValidators = ServiceProvider.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The broker dose not load the AccessValidator");
return;
}
// for (AccessValidator accessValidator : accessValidators) {
// final AccessValidator validator = accessValidator;
// this.registerServerRPCHook(new Interceptor() {
//
// @Override public String interceptorName() {
// return "aclInterceptor";
// }
//
// @Override public void beforeRequest(RequestContext requestContext) {
//
// validator.validate(validator.parse(requestContext.getRequest(), requestContext.getRemotingChannel().remoteAddress().toString()));
//
// }
//
// @Override public void afterRequest(ResponseContext responseContext) {
//
// }
//
// @Override public void onException(ExceptionContext exceptionContext) {
//
// }
//
// });
// }
}
private void initialRpcHooks() {
List<RPCHook> rpcHooks = ServiceProvider.loadServiceList(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
for (RPCHook rpcHook : rpcHooks) {
this.remotingServer.registerServerRPCHook(rpcHook);
}
}
// registerInterceoptorGroup()
public void registerProcessor() {
/**
* SendMessageProcessor
......@@ -822,6 +871,11 @@ public class BrokerController {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
......@@ -844,12 +898,6 @@ public class BrokerController {
this.brokerFastFailure.start();
}
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
}
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
......@@ -1000,9 +1048,10 @@ public class BrokerController {
log.info("register ConsumeMessageHook Hook, {}", hook.hookName());
}
public void registerServerRPCHook(RPCHook rpcHook) {
// getRemotingServer().registerRPCHook(rpcHook);
}
// public void registerServerRPCHook(RPCHook rpcHook) {
//// getRemotingServer().registerRPCHook(rpcHook);
//// this.fastRemotingServer.registerRPCHook(rpcHook);
//// }
public RemotingServer getRemotingServer() {
return remotingServer;
......@@ -1064,5 +1113,113 @@ public class BrokerController {
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
}
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
//change the role
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}
//handle the transactional service
try {
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}
//handle the slave synchronise
handleSlaveSynchronize(BrokerRole.SLAVE);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
//handle the slave synchronise
handleSlaveSynchronize(role);
//handle the scheduled service
try {
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}
//handle the transactional service
try {
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}
//if the operations above are totally successful, we change to master
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
try {
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.dledger;
import io.openmessaging.storage.dledger.DLedgerLeaderElector;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
private BrokerController brokerController;
private DefaultMessageStore messageStore;
private DLedgerCommitLog dLedgerCommitLog;
private DLedgerServer dLegerServer;
public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
this.brokerController = brokerController;
this.messageStore = messageStore;
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
}
@Override public void handle(long term, MemberState.Role role) {
Runnable runnable = new Runnable() {
@Override public void run() {
long start = System.currentTimeMillis();
try {
boolean succ = true;
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLedgerCommitLog.getId());
}
break;
case FOLLOWER:
brokerController.changeToSlave(dLedgerCommitLog.getId());
break;
case LEADER:
while (true) {
if (!dLegerServer.getMemberState().isLeader()) {
succ = false;
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0) {
break;
}
Thread.sleep(100);
}
if (succ) {
messageStore.recoverTopicQueueTable();
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
}
break;
default:
break;
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
}
}
};
executorService.submit(runnable);
}
@Override public void startup() {
}
@Override public void shutdown() {
executorService.shutdown();
}
}
......@@ -152,7 +152,7 @@ public class BrokerOuterAPI {
registerBrokerResultList.add(result);
}
log.info("register broker to name server {} OK", namesrvAddr);
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
......
......@@ -54,7 +54,7 @@ public class SlaveSynchronize {
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
......@@ -78,7 +78,7 @@ public class SlaveSynchronize {
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
......@@ -94,7 +94,7 @@ public class SlaveSynchronize {
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
......@@ -118,7 +118,7 @@ public class SlaveSynchronize {
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
......
......@@ -124,6 +124,16 @@ public class TopicConfigManager extends ConfigManager {
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
}
public boolean isSystemTopic(final String topic) {
......
......@@ -22,36 +22,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
public class TransactionalMessageCheckService extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
private final AtomicBoolean started = new AtomicBoolean(false);
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public void start() {
if (started.compareAndSet(false, true)) {
super.start();
this.brokerController.getTransactionalMessageService().open();
}
}
@Override
public void shutdown(boolean interrupt) {
if (started.compareAndSet(true, false)) {
super.shutdown(interrupt);
this.brokerController.getTransactionalMessageService().close();
this.brokerController.getTransactionalMessageCheckListener().shutDown();
}
}
@Override
public String getServiceName() {
return TransactionalMessageCheckService.class.getSimpleName();
......
......@@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
......@@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
* @param removeMap Op message map to determine whether a half message was responded by producer.
* @param doneOpOffset Op Message which has been checked.
* @param msgExt Half message
* @param checkImmunityTime User defined time to avoid being detected early.
* @return Return true if put success, otherwise return false.
*/
private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset, MessageExt msgExt,
long checkImmunityTime) {
if (System.currentTimeMillis() - msgExt.getBornTimestamp() < checkImmunityTime) {
String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
if (null == prepareQueueOffsetStr) {
return putImmunityMsgBackToHalfQueue(msgExt);
private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset,
MessageExt msgExt) {
String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
if (null == prepareQueueOffsetStr) {
return putImmunityMsgBackToHalfQueue(msgExt);
} else {
long prepareQueueOffset = getLong(prepareQueueOffsetStr);
if (-1 == prepareQueueOffset) {
return false;
} else {
long prepareQueueOffset = getLong(prepareQueueOffsetStr);
if (-1 == prepareQueueOffset) {
return false;
if (removeMap.containsKey(prepareQueueOffset)) {
long tmpOpOffset = removeMap.remove(prepareQueueOffset);
doneOpOffset.add(tmpOpOffset);
return true;
} else {
if (removeMap.containsKey(prepareQueueOffset)) {
long tmpOpOffset = removeMap.remove(prepareQueueOffset);
doneOpOffset.add(tmpOpOffset);
return true;
} else {
return putImmunityMsgBackToHalfQueue(msgExt);
}
return putImmunityMsgBackToHalfQueue(msgExt);
}
}
} else {
return true;
}
}
......
org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
......@@ -24,6 +24,7 @@ import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.pagecache;
import java.nio.ByteBuffer;
import org.apache.rocketmq.store.GetMessageResult;
import org.junit.Assert;
import org.junit.Test;
public class ManyMessageTransferTest {
@Test
public void ManyMessageTransferBuilderTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
}
@Test
public void ManyMessageTransferPosTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
Assert.assertEquals(manyMessageTransfer.position(),4);
}
@Test
public void ManyMessageTransferCountTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
Assert.assertEquals(manyMessageTransfer.count(),20);
}
@Test
public void ManyMessageTransferCloseTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
GetMessageResult getMessageResult = new GetMessageResult();
ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
manyMessageTransfer.close();
manyMessageTransfer.deallocate();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.pagecache;
import java.nio.ByteBuffer;
import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.junit.Assert;
import org.junit.Test;
public class OneMessageTransferTest {
@Test
public void OneMessageTransferTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
}
@Test
public void OneMessageTransferCountTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
Assert.assertEquals(manyMessageTransfer.count(),40);
}
@Test
public void OneMessageTransferPosTest(){
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putInt(20);
SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
Assert.assertEquals(manyMessageTransfer.position(),8);
}
}
......@@ -17,6 +17,8 @@
package org.apache.rocketmq.broker.util;
import java.util.List;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
import org.apache.rocketmq.remoting.util.ServiceProvider;
......@@ -39,4 +41,10 @@ public class ServiceProviderTest {
AbstractTransactionalMessageCheckListener.class);
assertThat(listener).isNotNull();
}
@Test
public void loadAccessValidatorTest() {
List<AccessValidator> accessValidators = ServiceProvider.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
assertThat(accessValidators).isNotNull();
}
}
org.apache.rocketmq.acl.plain.PlainAccessValidator
\ No newline at end of file
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -257,6 +257,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
}
@Override
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
}
@Override
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
}
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
......@@ -270,6 +282,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
}
@Override
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
}
@Override
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
}
@Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
......
......@@ -29,6 +29,10 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -36,6 +40,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -56,6 +61,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Internal implementation. Most of the functions herein are delegated to it.
*/
......@@ -246,6 +253,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long consumeTimeout = 15;
/**
* Interface of asynchronous transfer data
*/
private TraceDispatcher traceDispatcher = null;
/**
* Default constructor.
*/
......@@ -258,7 +270,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
......@@ -267,6 +279,33 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
/**
* Constructor specifying consumer group, RPC hook, message queue allocating algorithm, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
* Constructor specifying RPC hook.
*
......@@ -276,6 +315,28 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group and enabled msg trace flag.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, null);
}
/**
* Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.
*
* @param consumerGroup Consumer group.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying consumer group.
*
......@@ -518,6 +579,13 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
/**
......@@ -526,6 +594,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}
@Override
......@@ -694,4 +765,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
}
......@@ -66,6 +66,39 @@ public interface MQPullConsumer extends MQConsumer {
final int maxNums, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
/**
* Pulling the messages, not blocking
* <p>
* support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
* </p>
*
* @param mq from which message queue
* @param selector message selector({@link MessageSelector}), can be null.
* @param offset from where to pull
* @param maxNums max pulling numbers
* @return The resulting {@code PullRequest}
*/
PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
/**
* Pulling the messages in the specified timeout
* <p>
* support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
* </p>
*
* @param mq from which message queue
* @param selector message selector({@link MessageSelector}), can be null.
* @param offset from where to pull
* @param maxNums max pulling numbers
* @param timeout Pulling the messages in the specified timeout
* @return The resulting {@code PullRequest}
*/
PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
/**
* Pulling the messages in a async. way
*/
......@@ -80,6 +113,20 @@ public interface MQPullConsumer extends MQConsumer {
final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
InterruptedException;
/**
* Pulling the messages in a async. way. Support message selection
*/
void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException, RemotingException,
InterruptedException;
/**
* Pulling the messages in a async. way. Support message selection
*/
void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
InterruptedException;
/**
* Pulling the messages,if no message arrival,blocking some time
*
......
......@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
......@@ -46,6 +47,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
......@@ -158,17 +160,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
}
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
}
private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
throws MQClientException {
if (null == mq) {
throw new MQClientException("mq is null", null);
}
try {
return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
}
private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
throws MQClientException {
if (null == mq) {
throw new MQClientException("mq is null", null);
}
try {
return FilterAPI.build(mq.getTopic(),
messageSelector.getExpression(), messageSelector.getExpressionType());
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
}
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
if (null == mq) {
throw new MQClientException("mq is null", null);
}
if (offset < 0) {
......@@ -183,20 +226,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset,
maxNums,
sysFlag,
......@@ -369,12 +406,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
}
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
}
private void pullAsyncImpl(
final MessageQueue mq,
final String subExpression,
final SubscriptionData subscriptionData,
final long offset,
final int maxNums,
final PullCallback pullCallback,
......@@ -403,20 +455,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
try {
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
final SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
0L,
subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset,
maxNums,
sysFlag,
......@@ -444,7 +490,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
public DefaultMQPullConsumer getDefaultMQPullConsumer() {
......@@ -454,7 +501,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
......
......@@ -212,34 +212,6 @@ public class PullAPIWrapper {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pullKernelImpl(
mq,
subExpression,
ExpressionType.TAG,
subVersion, offset,
maxNums,
sysFlag,
commitOffset,
brokerSuspendMaxTimeMillis,
timeoutMillis,
communicationMode,
pullCallback
);
}
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
......
......@@ -678,10 +678,10 @@ public class MQClientInstance {
} catch (Exception e) {
log.error("send heart beat error:{}", e);
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
id, addr, e);
}
}
}
......@@ -1186,6 +1186,7 @@ public class MQClientInstance {
return this.brokerVersionTable.get(brokerName).get(brokerAddr);
}
}
//To do need to fresh the version
return 0;
}
......
......@@ -30,8 +30,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
......@@ -101,6 +103,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null);
}
......@@ -108,6 +114,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
......@@ -462,7 +484,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
......@@ -656,6 +678,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return topicPublishInfo;
}
}
private void tryToFindSnodePublishInfo() {
this.mQClientFactory.updateSnodeInfoFromNameServer();
}
......@@ -967,7 +990,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
......@@ -1091,7 +1114,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
......@@ -1258,7 +1281,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public ExecutorService getCallbackExecutor() {
return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
}
public ExecutorService getAsyncSenderExecutor() {
return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor;
}
public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
this.asyncSenderExecutor = asyncSenderExecutor;
}
public SendResult send(Message msg,
......
......@@ -25,6 +25,10 @@ import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
......@@ -33,6 +37,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
......@@ -55,6 +60,8 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private final InternalLogger log = ClientLogger.getLog();
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
......@@ -118,6 +125,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
*/
private int maxMessageSize = 1024 * 1024 * 4; // 4M
/**
* Interface of asynchronous transfer data
*/
private TraceDispatcher traceDispatcher = null;
/**
* Default constructor.
*/
......@@ -136,6 +148,31 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
/**
* Constructor specifying producer group, RPC hook, enabled msgTrace flag and customized trace topic name.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
* Constructor specifying producer group.
*
......@@ -146,8 +183,30 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Constructor specifying the RPC hook.
* Constructor specifying producer group and enabled msg trace flag.
*
* @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace) {
this(producerGroup, null, enableMsgTrace, null);
}
/**
* Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
*
* @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(producerGroup, null, enableMsgTrace, customizedTraceTopic);
}
/**
* Constructor specifying the RPC hook.
*
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(RPCHook rpcHook) {
......@@ -169,6 +228,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
/**
......@@ -177,6 +243,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
@Override
public void shutdown() {
this.defaultMQProducerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}
/**
......@@ -654,6 +723,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
}
/**
* Sets an Executor to be used for executing asynchronous send. If the Executor is not set, {@link
* DefaultMQProducerImpl#defaultAsyncSenderExecutor} will be used.
*
* @param asyncSenderExecutor the instance of Executor
*/
public void setAsyncSenderExecutor(final ExecutorService asyncSenderExecutor) {
this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
......@@ -776,4 +855,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.UUID;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
public class AsyncTraceDispatcher implements TraceDispatcher {
private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecuter;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) throws MQClientException {
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
} else {
this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
}
this.traceExecuter = new ThreadPoolExecutor(//
10, //
20, //
1000 * 60, //
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = getAndCreateTraceProducer(rpcHook);
}
public String getTraceTopicName() {
return traceTopicName;
}
public void setTraceTopicName(String traceTopicName) {
this.traceTopicName = traceTopicName;
}
public DefaultMQProducer getTraceProducer() {
return traceProducer;
}
public DefaultMQProducerImpl getHostProducer() {
return hostProducer;
}
public void setHostProducer(DefaultMQProducerImpl hostProducer) {
this.hostProducer = hostProducer;
}
public DefaultMQPushConsumerImpl getHostConsumer() {
return hostConsumer;
}
public void setHostConsumer(DefaultMQPushConsumerImpl hostConsumer) {
this.hostConsumer = hostConsumer;
}
public void start(String nameSrvAddr) throws MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K
traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
}
return traceProducerInstance;
}
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}
@Override
public void flush() throws IOException {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());
}
@Override
public void shutdown() {
this.stopped = true;
this.traceExecuter.shutdown();
if (isStarted.get()) {
traceProducer.shutdown();
}
this.removeShutdownHook();
}
public void registerShutDownHook() {
if (shutDownHook == null) {
shutDownHook = new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
@Override
public void run() {
synchronized (this) {
if (!this.hasShutdown) {
try {
flush();
} catch (IOException e) {
log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
}
}
}
}
}, "ShutdownHookMQTrace");
Runtime.getRuntime().addShutdownHook(shutDownHook);
}
}
public void removeShutdownHook() {
if (shutDownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutDownHook);
}
}
class AsyncRunnable implements Runnable {
private boolean stopped;
@Override
public void run() {
while (!stopped) {
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
//get trace data element from blocking Queue — traceContextQueue
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}
if (contexts.size() > 0) {
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecuter.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}
class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;
public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
}
}
@Override
public void run() {
sendTraceData(contextList);
}
public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
// Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();
// Use original message entity's topic as key
String key = topic;
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList);
}
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
flushData(entry.getValue());
}
}
/**
* Batch sending data actually
*/
private void flushData(List<TraceTransferBean> transBeanList) {
if (transBeanList.size() == 0) {
return;
}
// Temporary buffer
StringBuilder buffer = new StringBuilder(1024);
int count = 0;
Set<String> keySet = new HashSet<String>();
for (TraceTransferBean bean : transBeanList) {
// Keyset of message trace includes msgId of or original message
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
count++;
// Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString());
// Clear temporary buffer after finishing
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString());
}
transBeanList.clear();
}
/**
* Send message trace data
*
* @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
* @param data the message trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data) {
String topic = traceTopicName;
final Message message = new Message(topic, data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
try {
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
log.info("send trace data ,the traceData is " + data);
}
};
if (traceBrokerSet.isEmpty()) {
// No cross set
traceProducer.send(message, callback, 5000);
} else {
traceProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Set<String> brokerSet = (Set<String>) arg;
List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
for (MessageQueue queue : mqs) {
if (brokerSet.contains(queue.getBrokerName())) {
filterMqs.add(queue);
}
}
int index = sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % filterMqs.size();
if (pos < 0) {
pos = 0;
}
return filterMqs.get(pos);
}
}, traceBrokerSet, callback);
}
} catch (Exception e) {
log.info("send trace data,the traceData is" + data);
}
}
private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
Set<String> brokerSet = new HashSet<String>();
TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
brokerSet.add(queue.getBrokerName());
}
}
return brokerSet;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
public class TraceBean {
private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
private String topic = "";
private String msgId = "";
private String offsetMsgId = "";
private String tags = "";
private String keys = "";
private String storeHost = LOCAL_ADDRESS;
private String clientHost = LOCAL_ADDRESS;
private long storeTime;
private int retryTimes;
private int bodyLength;
private MessageType msgType;
public MessageType getMsgType() {
return msgType;
}
public void setMsgType(final MessageType msgType) {
this.msgType = msgType;
}
public String getOffsetMsgId() {
return offsetMsgId;
}
public void setOffsetMsgId(final String offsetMsgId) {
this.offsetMsgId = offsetMsgId;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getTags() {
return tags;
}
public void setTags(String tags) {
this.tags = tags;
}
public String getKeys() {
return keys;
}
public void setKeys(String keys) {
this.keys = keys;
}
public String getStoreHost() {
return storeHost;
}
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
public String getClientHost() {
return clientHost;
}
public void setClientHost(String clientHost) {
this.clientHost = clientHost;
}
public long getStoreTime() {
return storeTime;
}
public void setStoreTime(long storeTime) {
this.storeTime = storeTime;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
}
public int getBodyLength() {
return bodyLength;
}
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
public class TraceConstants {
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import java.util.List;
/**
* The context of Trace
*/
public class TraceContext implements Comparable<TraceContext> {
private TraceType traceType;
private long timeStamp = System.currentTimeMillis();
private String regionId = "";
private String regionName = "";
private String groupName = "";
private int costTime = 0;
private boolean isSuccess = true;
private String requestId = MessageClientIDSetter.createUniqID();
private int contextCode = 0;
private List<TraceBean> traceBeans;
public int getContextCode() {
return contextCode;
}
public void setContextCode(final int contextCode) {
this.contextCode = contextCode;
}
public List<TraceBean> getTraceBeans() {
return traceBeans;
}
public void setTraceBeans(List<TraceBean> traceBeans) {
this.traceBeans = traceBeans;
}
public String getRegionId() {
return regionId;
}
public void setRegionId(String regionId) {
this.regionId = regionId;
}
public TraceType getTraceType() {
return traceType;
}
public void setTraceType(TraceType traceType) {
this.traceType = traceType;
}
public long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public int getCostTime() {
return costTime;
}
public void setCostTime(int costTime) {
this.costTime = costTime;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getRegionName() {
return regionName;
}
public void setRegionName(String regionName) {
this.regionName = regionName;
}
@Override
public int compareTo(TraceContext o) {
return (int) (this.timeStamp - o.getTimeStamp());
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(1024);
sb.append(traceType).append("_").append(groupName)
.append("_").append(regionId).append("_").append(isSuccess).append("_");
if (traceBeans != null && traceBeans.size() > 0) {
for (TraceBean bean : traceBeans) {
sb.append(bean.getMsgId() + "_" + bean.getTopic() + "_");
}
}
return "TraceContext{" + sb.toString() + '}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.message.MessageType;
import java.util.ArrayList;
import java.util.List;
/**
* Encode/decode for Trace Data
*/
public class TraceDataEncoder {
/**
* Resolving traceContext list From trace data String
*
* @param traceData
* @return
*/
public static List<TraceContext> decoderFromTraceDataString(String traceData) {
List<TraceContext> resList = new ArrayList<TraceContext>();
if (traceData == null || traceData.length() <= 0) {
return resList;
}
String[] contextList = traceData.split(String.valueOf(TraceConstants.FIELD_SPLITOR));
for (String context : contextList) {
String[] line = context.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
if (line[0].equals(TraceType.Pub.name())) {
TraceContext pubContext = new TraceContext();
pubContext.setTraceType(TraceType.Pub);
pubContext.setTimeStamp(Long.parseLong(line[1]));
pubContext.setRegionId(line[2]);
pubContext.setGroupName(line[3]);
TraceBean bean = new TraceBean();
bean.setTopic(line[4]);
bean.setMsgId(line[5]);
bean.setTags(line[6]);
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setBodyLength(Integer.parseInt(line[9]));
pubContext.setCostTime(Integer.parseInt(line[10]));
bean.setMsgType(MessageType.values()[Integer.parseInt(line[11])]);
if (line.length == 13) {
pubContext.setSuccess(Boolean.parseBoolean(line[12]));
} else if (line.length == 14) {
bean.setOffsetMsgId(line[12]);
pubContext.setSuccess(Boolean.parseBoolean(line[13]));
}
pubContext.setTraceBeans(new ArrayList<TraceBean>(1));
pubContext.getTraceBeans().add(bean);
resList.add(pubContext);
} else if (line[0].equals(TraceType.SubBefore.name())) {
TraceContext subBeforeContext = new TraceContext();
subBeforeContext.setTraceType(TraceType.SubBefore);
subBeforeContext.setTimeStamp(Long.parseLong(line[1]));
subBeforeContext.setRegionId(line[2]);
subBeforeContext.setGroupName(line[3]);
subBeforeContext.setRequestId(line[4]);
TraceBean bean = new TraceBean();
bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]);
subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext);
} else if (line[0].equals(TraceType.SubAfter.name())) {
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);
subAfterContext.setRequestId(line[1]);
TraceBean bean = new TraceBean();
bean.setMsgId(line[2]);
bean.setKeys(line[5]);
subAfterContext.setTraceBeans(new ArrayList<TraceBean>(1));
subAfterContext.getTraceBeans().add(bean);
subAfterContext.setCostTime(Integer.parseInt(line[3]));
subAfterContext.setSuccess(Boolean.parseBoolean(line[4]));
if (line.length >= 7) {
// add the context type
subAfterContext.setContextCode(Integer.parseInt(line[6]));
}
resList.add(subAfterContext);
}
}
return resList;
}
/**
* Encoding the trace context into data strings and keyset sets
*
* @param ctx
* @return
*/
public static TraceTransferBean encoderFromContextBean(TraceContext ctx) {
if (ctx == null) {
return null;
}
//build message trace of the transfering entity content bean
TraceTransferBean transferBean = new TraceTransferBean();
StringBuilder sb = new StringBuilder(256);
switch (ctx.getTraceType()) {
case Pub: {
TraceBean bean = ctx.getTraceBeans().get(0);
//append the content of context and traceBean to transferBean's TransData
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getBodyLength()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);
}
break;
case SubBefore: {
for (TraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
}
}
break;
case SubAfter: {
for (TraceBean bean : ctx.getTraceBeans()) {
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
}
}
break;
default:
}
transferBean.setTransData(sb.toString());
for (TraceBean bean : ctx.getTraceBeans()) {
transferBean.getTransKey().add(bean.getMsgId());
if (bean.getKeys() != null && bean.getKeys().length() > 0) {
transferBean.getTransKey().add(bean.getKeys());
}
}
return transferBean;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.exception.MQClientException;
import java.io.IOException;
/**
* Interface of asynchronous transfer data
*/
public interface TraceDispatcher {
/**
* Initialize asynchronous transfer data module
*/
void start(String nameSrvAddr) throws MQClientException;
/**
* Append the transfering data
* @param ctx data infomation
* @return
*/
boolean append(Object ctx);
/**
* Write flush action
*
* @throws IOException
*/
void flush() throws IOException;
/**
* Close the trace Hook
*/
void shutdown();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
public enum TraceDispatcherType {
PRODUCER,
CONSUMER
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.util.HashSet;
import java.util.Set;
/**
* Trace transfering bean
*/
public class TraceTransferBean {
private String transData;
private Set<String> transKey = new HashSet<String>();
public String getTransData() {
return transData;
}
public void setTransData(String transData) {
this.transData = transData;
}
public Set<String> getTransKey() {
return transKey;
}
public void setTransKey(Set<String> transKey) {
this.transKey = transKey;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
public enum TraceType {
Pub,
SubBefore,
SubAfter,
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.ArrayList;
import java.util.List;
public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
private TraceDispatcher localDispatcher;
public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "ConsumeMessageTraceHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext traceContext = new TraceContext();
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);//
traceContext.setGroupName(context.getConsumerGroup());//
List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
if (traceOn != null && traceOn.equals("false")) {
// If trace switch is false ,skip it
continue;
}
TraceBean traceBean = new TraceBean();
traceBean.setTopic(msg.getTopic());//
traceBean.setMsgId(msg.getMsgId());//
traceBean.setTags(msg.getTags());//
traceBean.setKeys(msg.getKeys());//
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it
return;
}
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);//
subAfterContext.setRegionId(subBeforeContext.getRegionId());//
subAfterContext.setGroupName(subBeforeContext.getGroupName());//
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
// Caculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
localDispatcher.append(subAfterContext);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
import org.apache.rocketmq.client.trace.TraceType;
import java.util.ArrayList;
public class SendMessageTraceHookImpl implements SendMessageHook {
private TraceDispatcher localDispatcher;
public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}
@Override
public String hookName() {
return "SendMessageTraceHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
context.setMqTraceContext(tuxeContext);
tuxeContext.setTraceType(TraceType.Pub);
tuxeContext.setGroupName(context.getProducerGroup());
//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
tuxeContext.getTraceBeans().add(traceBean);
}
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message trace data,then it doesn't recorded
if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null
|| !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}
TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
localDispatcher.append(tuxeContext);
}
}
......@@ -169,10 +169,7 @@ public class DefaultMQProducerTest {
@Test
public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
......@@ -188,16 +185,12 @@ public class DefaultMQProducerTest {
}
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
}
@Test
public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(6);
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
SendCallback sendCallback = new SendCallback() {
@Override
......@@ -229,16 +222,13 @@ public class DefaultMQProducerTest {
producer.send(message, messageQueueSelector, null, sendCallback, 1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
assertThat(cc.get()).isEqualTo(6);
}
@Test
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(bigMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
......@@ -254,7 +244,6 @@ public class DefaultMQProducerTest {
}
});
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQConsumerWithTraceTest {
private String consumerGroup;
private String consumerGroupNormal;
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
private DefaultMQPushConsumer normalPushConsumer;
private DefaultMQPushConsumer customTraceTopicpushConsumer;
private AsyncTraceDispatcher asyncTraceDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
private DefaultMQProducer traceProducer;
private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,"");
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,"");
customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
asyncTraceDispatcher = (AsyncTraceDispatcher)pushConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory());
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory);
fieldTrace = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
fieldTrace.setAccessible(true);
fieldTrace.set(mQClientTraceFactory, mQClientTraceAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
}
@After
public void terminate() {
pushConsumer.shutdown();
}
// @Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return null;
}
}));
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
pullRequest.setMessageQueue(messageQueue);
ProcessQueue processQueue = new ProcessQueue();
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
pullRequest.setProcessQueue(processQueue);
return pullRequest;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
public static TopicRouteData createTraceTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-trace");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10912");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker-trace");
queueData.setPerm(6);
queueData.setReadQueueNums(1);
queueData.setWriteQueueNums(1);
queueData.setTopicSynFlag(1);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private AsyncTraceDispatcher asyncTraceDispatcher;
private DefaultMQProducer producer;
private DefaultMQProducer customTraceTopicproducer;
private DefaultMQProducer traceProducer;
private DefaultMQProducer normalProducer;
private Message message;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp, false, customerTraceTopic);
normalProducer = new DefaultMQProducer(producerGroupTemp, false, "");
producer = new DefaultMQProducer(producerGroupTemp, true, "");
producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[]{'a', 'b', 'c'});
asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
asyncTraceDispatcher.getHostProducer();
asyncTraceDispatcher.getHostConsumer();
traceProducer = asyncTraceDispatcher.getTraceProducer();
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
} catch (MQClientException e) {
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@Test
public void testSendMessageSync_WithTrace_NoBrokerSet_Exception() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
producer.send(message);
} catch (MQClientException e) {
}
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
}
@After
public void terminate() {
producer.shutdown();
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSynFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
public static TopicRouteData createTraceTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("broker-trace");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10912");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("broker-trace");
queueData.setPerm(6);
queueData.setReadQueueNums(1);
queueData.setWriteQueueNums(1);
queueData.setTopicSynFlag(1);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
}
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -25,6 +25,8 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import static org.apache.rocketmq.common.SnodeConfig.localHostName;
public class BrokerConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
......@@ -51,7 +53,10 @@ public class BrokerConfig {
@ImportantField
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
@ImportantField
private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
@ImportantField
private boolean traceTopicEnable = false;
/**
* thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
* value is 1.
......@@ -171,7 +176,6 @@ public class BrokerConfig {
@ImportantField
private long transactionCheckInterval = 60 * 1000;
@ImportantField
private boolean transactionEnable = true;
......@@ -182,6 +186,21 @@ public class BrokerConfig {
public void setTransactionEnable(boolean transactionEnable) {
this.transactionEnable = transactionEnable;
}
/**
* Acl feature switch
*/
@ImportantField
private boolean aclEnable = false;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_BROKER";
}
public boolean isTraceOn() {
return traceOn;
......@@ -247,16 +266,6 @@ public class BrokerConfig {
this.slaveReadEnable = slaveReadEnable;
}
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
log.error("Failed to obtain the host name", e);
}
return "DEFAULT_BROKER";
}
public int getRegisterBrokerTimeoutMills() {
return registerBrokerTimeoutMills;
}
......@@ -744,4 +753,28 @@ public class BrokerConfig {
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
}
public String getMsgTraceTopicName() {
return msgTraceTopicName;
}
public void setMsgTraceTopicName(String msgTraceTopicName) {
this.msgTraceTopicName = msgTraceTopicName;
}
public boolean isTraceTopicEnable() {
return traceTopicEnable;
}
public void setTraceTopicEnable(boolean traceTopicEnable) {
this.traceTopicEnable = traceTopicEnable;
}
public boolean isAclEnable() {
return aclEnable;
}
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
}
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion {
public static final int CURRENT_VERSION = Version.V4_3_1.ordinal();
public static final int CURRENT_VERSION = Version.V4_4_0.ordinal();
public static String getVersionDesc(int value) {
int length = Version.values().length;
......
......@@ -90,6 +90,7 @@ public class MixAll {
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
......
......@@ -27,18 +27,29 @@ public abstract class ServiceThread implements Runnable {
private static final long JOIN_TIME = 90 * 1000;
protected final Thread thread;
private Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
protected boolean isDaemon = false;
//Make it able to restart the thread
private final AtomicBoolean started = new AtomicBoolean(false);
public ServiceThread() {
this.thread = new Thread(this, this.getServiceName());
}
public abstract String getServiceName();
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
......@@ -47,6 +58,10 @@ public abstract class ServiceThread implements Runnable {
}
public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
......@@ -75,11 +90,16 @@ public abstract class ServiceThread implements Runnable {
return JOIN_TIME;
}
@Deprecated
public void stop() {
this.stop(false);
}
@Deprecated
public void stop(final boolean interrupt) {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
......@@ -93,6 +113,9 @@ public abstract class ServiceThread implements Runnable {
}
public void makeStop() {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("makestop thread " + this.getServiceName());
}
......@@ -128,4 +151,12 @@ public abstract class ServiceThread implements Runnable {
public boolean isStopped() {
return stopped;
}
public boolean isDaemon() {
return isDaemon;
}
public void setDaemon(boolean daemon) {
isDaemon = daemon;
}
}
......@@ -61,6 +61,18 @@ public class UtilAll {
}
}
public static void sleep(long sleepMs) {
if (sleepMs < 0) {
return;
}
try {
Thread.sleep(sleepMs);
} catch (Throwable ignored) {
}
}
public static String currentStackTrace() {
StringBuilder sb = new StringBuilder();
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
......
......@@ -18,5 +18,5 @@
package org.apache.rocketmq.common.constant;
public class DBMsgConstants {
public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB
public static final int MAX_BODY_SIZE = 64 * 1024 * 1024; //64KB
}
......@@ -41,6 +41,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC
......
......@@ -27,4 +27,21 @@ public class BrokerConfigTest {
long expect = 1024L * 1024 * 1024 * 16;
assertThat(new BrokerConfig().getConsumerFallbehindThreshold()).isEqualTo(expect);
}
@Test
public void testBrokerConfigAttribute() {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setNamesrvAddr("127.0.0.1:9876");
brokerConfig.setAutoCreateTopicEnable(false);
brokerConfig.setBrokerName("broker-a");
brokerConfig.setBrokerId(0);
brokerConfig.setBrokerClusterName("DefaultCluster");
brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
}
}
\ No newline at end of file
......@@ -17,14 +17,13 @@
package org.apache.rocketmq.common;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
......
#! /bin/bash
## Revise the base dir
CURRENT_DIR="$(cd "$(dirname "$0")"; pwd)"
RMQ_DIR=$CURRENT_DIR/../..
cd $RMQ_DIR
function startNameserver() {
export JAVA_OPT_EXT=" -Xms512m -Xmx512m "
nohup bin/mqnamesrv &
}
function startBroker() {
export JAVA_OPT_EXT=" -Xms1g -Xmx1g "
conf_name=$1
nohup bin/mqbroker -c $conf_name &
}
function stopNameserver() {
PIDS=$(ps -ef|grep java|grep NamesrvStartup|grep -v grep|awk '{print $2}')
if [ ! -z "$PIDS" ]; then
kill -s TERM $PIDS
fi
}
function stopBroker() {
conf_name=$1
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
i=1
while [ ! -z "$PIDS" -a $i -lt 5 ]
do
echo "Waiting to kill ..."
kill -s TERM $PIDS
((i=$i+1))
sleep 2
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
done
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
if [ ! -z "$PIDS" ]; then
kill -9 $PIDS
fi
}
function stopAll() {
ps -ef|grep java|grep BrokerStartup|grep -v grep|awk '{print $2}'|xargs kill
stopNameserver
stopBroker ./conf/dledger/broker-n0.conf
stopBroker ./conf/dledger/broker-n1.conf
stopBroker ./conf/dledger/broker-n2.conf
}
function startAll() {
startNameserver
startBroker ./conf/dledger/broker-n0.conf
startBroker ./conf/dledger/broker-n1.conf
startBroker ./conf/dledger/broker-n2.conf
}
function checkConf() {
if [ ! -f ./conf/dledger/broker-n0.conf -o ! -f ./conf/dledger/broker-n1.conf -o ! -f ./conf/dledger/broker-n2.conf ]; then
echo "Make sure the ./conf/dledger/broker-n0.conf, ./conf/dledger/broker-n1.conf, ./conf/dledger/broker-n2.conf exists"
exit -1
fi
}
## Main
if [ $# -lt 1 ]; then
echo "Usage: sh $0 start|stop"
exit -1
fi
action=$1
checkConf
case $action in
"start")
startAll
exit
;;
"stop")
stopAll
;;
*)
echo "Usage: sh $0 start|stop"
;;
esac
......@@ -37,7 +37,7 @@ export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
brokerClusterName=DefaultCluster
brokerName=broker-trace
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30921
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30931
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
accessKey: rocketmq
secretKey: 12345678
......@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
......
......@@ -40,7 +40,7 @@
<fileSet>
<includes>
<include>bin/*</include>
<include>bin/**</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
......
此差异已折叠。
# 权限控制
## 前言
该文档主要介绍如何快速部署和使用支持权限控制特性的RocketMQ 集群。
## 1.权限控制特性介绍
权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在distribution/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常;
ACL客户端可以参考:**org.apache.rocketmq.example.simple**包下面的**AclClient**代码。
## 2. 权限控制的定义与属性值
### 2.1权限定义
对RocketMQ的Topic资源访问权限控制定义主要如下表所示,分为以下四种
| 权限 | 含义 |
| --- | --- |
| DENY | 拒绝 |
| ANY | PUB 或者 SUB 权限 |
| PUB | 发送权限 |
| SUB | 订阅权限 |
### 2.2 权限定义的关键属性
| 字段 | 取值 | 含义 |
| --- | --- | --- |
| globalWhiteRemoteAddresses | \*;192.168.\*.\*;192.168.0.1 | 全局IP白名单 |
| accessKey | 字符串 | Access Key |
| secretKey | 字符串 | Secret Key |
| whiteRemoteAddress | \*;192.168.\*.\*;192.168.0.1 | 用户IP白名单 |
| admin | true;false | 是否管理员账户 |
| defaultTopicPerm | DENY;PUB;SUB;PUB\|SUB | 默认的Topic权限 |
| defaultGroupPerm | DENY;PUB;SUB;PUB\|SUB | 默认的ConsumerGroup权限 |
| topicPerms | topic=权限 | 各个Topic的权限 |
| groupPerms | group=权限 | 各个ConsumerGroup的权限 |
具体可以参考**distribution/conf/plain_acl.yml**配置文件
## 3. 支持权限控制的集群部署
**distribution/conf/plain_acl.yml**配置文件中按照上述说明定义好权限属性后,打开**aclEnable**开关变量即可开启RocketMQ集群的ACL特性。这里贴出Broker端开启ACL特性的properties配置文件内容:
```
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if acl is open,the flag will be true
aclEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876
```
## 4. 权限控制主要流程
ACL主要流程分为两部分,主要包括权限解析和权限校验。
### 4.1 权限解析
Broker端对客户端的RequestCommand请求进行解析,拿到需要鉴权的属性字段。
主要包括:
(1)AccessKey:类似于用户名,代指用户主体,权限数据与之对应;
(2)Signature:客户根据 SecretKey 签名得到的串,服务端再用SecretKey进行签名验证;
### 4.2 权限校验
Broker端对权限的校验逻辑主要分为以下几步:
(1)检查是否命中全局 IP 白名单;如果是,则认为校验通过;否则走 2;
(2)检查是否命中用户 IP 白名单;如果是,则认为校验通过;否则走 3;
(3)校验签名,校验不通过,抛出异常;校验通过,则走 4;
(4)对用户请求所需的权限 和 用户所拥有的权限进行校验;不通过,抛出异常;
用户所需权限的校验需要注意已下内容:
(1)特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;
(2)对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限;
## 5. 热加载修改后权限控制定义
RocketrMQ的权限控制存储的默认实现是基于yml配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点。
# 架构设计
## 技术架构
![](image/rocketmq_architecture_1.png)
RocketMQ架构上主要分为四部分,如上图所示:
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
![](image/rocketmq_architecture_2.png)
## 部署架构
![](image/rocketmq_architecture_3.png)
### RocketMQ 网络部署特点
- NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
- Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
结合部署架构图,描述集群工作流程:
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -51,7 +51,12 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.4.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
......@@ -26,7 +26,6 @@ public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册