提交 48fa70bc 编写于 作者: D duhenglucky

Merge branch 5.0.0-preview into pop_consumer

......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
......
......@@ -44,6 +44,16 @@ public class AclConstants {
public static final String CONFIG_TIME_STAMP = "timestamp";
public static final String PUB = "PUB";
public static final String SUB = "SUB";
public static final String DENY = "DENY";
public static final String PUB_SUB = "PUB|SUB";
public static final String SUB_PUB = "SUB|PUB";
public static final int ACCESS_KEY_MIN_LENGTH = 6;
public static final int SECRET_KEY_MIN_LENGTH = 6;
......
......@@ -60,14 +60,14 @@ public class Permission {
return Permission.DENY;
}
switch (permString.trim()) {
case "PUB":
case AclConstants.PUB:
return Permission.PUB;
case "SUB":
case AclConstants.SUB:
return Permission.SUB;
case "PUB|SUB":
case "SUB|PUB":
case AclConstants.PUB_SUB:
case AclConstants.SUB_PUB:
return Permission.PUB | Permission.SUB;
case "DENY":
case AclConstants.DENY:
return Permission.DENY;
default:
return Permission.DENY;
......@@ -89,6 +89,25 @@ public class Permission {
}
}
public static void checkResourcePerms(List<String> resources) {
if (resources == null || resources.isEmpty()) {
return;
}
for (String resource : resources) {
String[] items = StringUtils.split(resource, "=");
if (items.length != 2) {
throw new AclException(String.format("Parse Resource format error for %s.\n" +
"The expected resource format is 'Res=Perm'. For example: topicA=SUB", resource));
}
if (!AclConstants.DENY.equals(items[1].trim()) && Permission.DENY == Permission.parsePermFromString(items[1].trim())) {
throw new AclException(String.format("Parse resource permission error for %s.\n" +
"The expected permissions are 'SUB' or 'PUB' or 'SUB|PUB' or 'PUB|SUB'.", resource));
}
}
}
public static boolean needAdminPerm(Integer code) {
return ADMIN_CODE.contains(code);
}
......
......@@ -50,9 +50,9 @@ public class PlainPermissionManager {
private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE);
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory();
......@@ -80,7 +80,7 @@ public class PlainPermissionManager {
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}
......@@ -89,7 +89,7 @@ public class PlainPermissionManager {
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(), plainAccessResource);
}
}
......@@ -128,12 +128,17 @@ public class PlainPermissionManager {
if (plainAccessConfig == null) {
log.error("Parameter value plainAccessConfig is null,Please check your parameter");
return false;
throw new AclException("Parameter value plainAccessConfig is null, Please check your parameter");
}
Permission.checkResourcePerms(plainAccessConfig.getTopicPerms());
Permission.checkResourcePerms(plainAccessConfig.getGroupPerms());
Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class);
if (aclAccessConfigMap == null || aclAccessConfigMap.isEmpty()) {
throw new AclException(String.format("the %s file is not found or empty", fileHome + File.separator + fileName));
}
List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
Map<String, Object> updateAccountMap = null;
if (accounts != null) {
......@@ -164,8 +169,9 @@ public class PlainPermissionManager {
return false;
}
private Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccountMap, PlainAccessConfig plainAccessConfig) {
private Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccountMap,
PlainAccessConfig plainAccessConfig) {
Map<String, Object> newAccountsMap = null;
if (existedAccountMap == null) {
newAccountsMap = new LinkedHashMap<String, Object>();
......@@ -176,8 +182,8 @@ public class PlainPermissionManager {
if (StringUtils.isEmpty(plainAccessConfig.getAccessKey()) ||
plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH) {
throw new AclException(String.format(
"The accessKey=%s cannot be null and length should longer than 6",
plainAccessConfig.getAccessKey()));
"The accessKey=%s cannot be null and length should longer than 6",
plainAccessConfig.getAccessKey()));
}
newAccountsMap.put(AclConstants.CONFIG_ACCESS_KEY, plainAccessConfig.getAccessKey());
......@@ -218,8 +224,10 @@ public class PlainPermissionManager {
}
Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class);
Map.class);
if (aclAccessConfigMap == null || aclAccessConfigMap.isEmpty()) {
throw new AclException(String.format("the %s file is not found or empty", fileHome + File.separator + fileName));
}
List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get("accounts");
if (accounts != null) {
Iterator<Map<String, Object>> itemIterator = accounts.iterator();
......@@ -251,7 +259,9 @@ public class PlainPermissionManager {
Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class);
if (aclAccessConfigMap == null || aclAccessConfigMap.isEmpty()) {
throw new AclException(String.format("the %s file is not found or empty", fileHome + File.separator + fileName));
}
List<String> globalWhiteRemoteAddrList = (List<String>) aclAccessConfigMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
if (globalWhiteRemoteAddrList != null) {
......@@ -259,7 +269,7 @@ public class PlainPermissionManager {
globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList);
// Update globalWhiteRemoteAddr element in memeory map firstly
aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS,globalWhiteRemoteAddrList);
aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS, globalWhiteRemoteAddrList);
if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
return true;
}
......@@ -275,7 +285,7 @@ public class PlainPermissionManager {
List<PlainAccessConfig> configs = new ArrayList<>();
List<String> whiteAddrs = new ArrayList<>();
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
JSONObject.class);
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
......@@ -359,7 +369,7 @@ public class PlainPermissionManager {
|| plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) {
throw new AclException(String.format(
"The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
}
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setAccessKey(plainAccessConfig.getAccessKey());
......@@ -375,7 +385,7 @@ public class PlainPermissionManager {
Permission.parseResourcePerms(plainAccessResource, true, plainAccessConfig.getTopicPerms());
plainAccessResource.setRemoteAddressStrategy(remoteAddressStrategyFactory.
getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress()));
getRemoteAddressStrategy(plainAccessResource.getWhiteRemoteAddress()));
return plainAccessResource;
}
......
......@@ -193,7 +193,7 @@ public class RemoteAddressStrategyFactory {
throw new AclException(String.format("RangeRemoteAddressStrategy netaddress examine scope Exception start is %s , end is %s", start, end));
}
}
return this.end > 0 ? true : false;
return this.end > 0;
}
private boolean ipv6Analysis(String[] strArray, int index) {
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.acl.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -165,4 +166,27 @@ public class PermissionTest {
aclException.setStatus("netaddress examine scope Exception netaddress");
Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress");
}
@Test
public void checkResourcePermsNormalTest() {
Permission.checkResourcePerms(null);
Permission.checkResourcePerms(new ArrayList<>());
Permission.checkResourcePerms(Arrays.asList("topicA=PUB"));
Permission.checkResourcePerms(Arrays.asList("topicA=PUB", "topicB=SUB", "topicC=PUB|SUB"));
}
@Test(expected = AclException.class)
public void checkResourcePermsExceptionTest1() {
Permission.checkResourcePerms(Arrays.asList("topicA"));
}
@Test(expected = AclException.class)
public void checkResourcePermsExceptionTest2() {
Permission.checkResourcePerms(Arrays.asList("topicA="));
}
@Test(expected = AclException.class)
public void checkResourcePermsExceptionTest3() {
Permission.checkResourcePerms(Arrays.asList("topicA=DENY1"));
}
}
......@@ -546,6 +546,26 @@ public class PlainAccessValidatorTest {
Assert.assertEquals(plainAccessValidator.updateAccessConfig(plainAccessConfig), false);
}
@Test(expected = AclException.class)
public void createAndUpdateAccessAclYamlConfigExceptionTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
plainAccessConfig.setAccessKey("RocketMQ33");
plainAccessConfig.setSecretKey("123456789111");
List<String> topicPerms = new ArrayList<String>();
topicPerms.add("topicB=PUB");
plainAccessConfig.setTopicPerms(topicPerms);
List<String> groupPerms = new ArrayList<String>();
groupPerms.add("groupC=DENY1");
plainAccessConfig.setGroupPerms(groupPerms);
PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
// Create element in the acl access yaml config file
plainAccessValidator.updateAccessConfig(plainAccessConfig);
}
@Test
public void updateGlobalWhiteAddrsNormalTest() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -90,7 +90,7 @@ public class AssignmentManager {
log.error("ScheduledTask: failed to pull TopicRouteData from NameServer", e);
}
}
}, 13000, this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(), TimeUnit.MILLISECONDS);
}, 200, this.brokerController.getBrokerConfig().getLoadBalancePollNameServerInterval(), TimeUnit.MILLISECONDS);
}
......
......@@ -17,9 +17,9 @@
package org.apache.rocketmq.broker.mqtrace;
public interface SendMessageHook {
public String hookName();
String hookName();
public void sendMessageBefore(final SendMessageContext context);
void sendMessageBefore(final SendMessageContext context);
public void sendMessageAfter(final SendMessageContext context);
void sendMessageAfter(final SendMessageContext context);
}
......@@ -218,6 +218,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
......
......@@ -124,7 +124,7 @@ public class BrokerOuterAPITest {
boolean success = Iterables.any(booleanList,
new Predicate<Boolean>() {
public boolean apply(Boolean input) {
return input ? true : false;
return input;
}
});
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -47,6 +47,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
......
......@@ -30,9 +30,8 @@ public class ThreadLocalIndex {
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
this.threadLocalIndex.set(index);
return index;
this.threadLocalIndex.set(++index);
return Math.abs(index);
}
@Override
......
......@@ -23,6 +23,10 @@ import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAverage
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
......@@ -30,10 +34,13 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
/**
......@@ -153,6 +160,21 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
*/
private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
/**
* Interface of asynchronous transfer data
*/
private TraceDispatcher traceDispatcher = null;
/**
* The flag for message trace
*/
private boolean enableMsgTrace = false;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic;
/**
* Default constructor.
*/
......@@ -202,13 +224,24 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void start() throws MQClientException {
setTraceDispatcher();
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultLitePullConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
@Override
public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
if (null != traceDispatcher) {
traceDispatcher.shutdown();
}
}
@Override
......@@ -490,4 +523,36 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
}
public TraceDispatcher getTraceDispatcher() {
return traceDispatcher;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
private void setTraceDispatcher() {
if (isEnableMsgTrace()) {
try {
this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
}
......@@ -281,7 +281,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private TraceDispatcher traceDispatcher = null;
// force to use client rebalance
private boolean clientRebalance = false;
private boolean clientRebalance = true;
/**
* Default constructor.
......
......@@ -268,17 +268,23 @@ public class MQAdminImpl {
messageId.getOffset(), timeoutMillis);
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return queryMessage(topic, key, maxNum, begin, end, false);
}
public QueryResult queryMessageByUniqKey(String topic, String uniqKey, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return queryMessage(topic, uniqKey, maxNum, begin, end, true);
}
public MessageExt queryMessageByUniqKey(String topic,
String uniqKey) throws InterruptedException, MQClientException {
QueryResult qr = this.queryMessage(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
QueryResult qr = queryMessageByUniqKey(topic, uniqKey, 32,
MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE);
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
} else {
......
......@@ -41,12 +41,15 @@ import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientManager;
......@@ -115,6 +118,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000;
private DefaultLitePullConsumer defaultLitePullConsumer;
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
......@@ -142,6 +147,8 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
// only for test purpose, will be modified by reflection in unit test.
@SuppressWarnings("FieldMayBeFinal") private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
......@@ -161,6 +168,35 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
this.consumeMessageHookList.add(hook);
log.info("register consumeMessageHook Hook, {}", hook.hookName());
}
public void executeHookBefore(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageBefore(context);
} catch (Throwable e) {
log.error("consumeMessageHook {} executeHookBefore exception", hook.hookName(), e);
}
}
}
}
public void executeHookAfter(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
} catch (Throwable e) {
log.error("consumeMessageHook {} executeHookAfter exception", hook.hookName(), e);
}
}
}
}
private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING)
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
......@@ -632,9 +668,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
private long fetchConsumeOffset(MessageQueue messageQueue) {
private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
long offset = this.rebalanceImpl.computePullFromWhere(messageQueue);
long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue);
return offset;
}
......@@ -658,7 +694,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
private long nextPullOffset(MessageQueue messageQueue) {
private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
long offset = -1;
long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
if (seekOffset != -1) {
......@@ -745,7 +781,15 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return;
}
long offset = nextPullOffset(messageQueue);
long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
} catch (MQClientException e) {
log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
}
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
......@@ -759,7 +803,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
}
PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
if (this.isCancelled() || processQueue.isDropped()) {
return;
......@@ -854,6 +898,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}
......
......@@ -293,8 +293,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
if (!pullRequest.isPreviouslyLocked()) {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
}
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
......@@ -303,7 +310,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setPreviouslyLocked(true);
pullRequest.setNextOffset(offset);
}
} else {
......@@ -577,8 +584,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
invisibleTime = 60000;
}
this.pullAPIWrapper.popAsync(popRequest.getMessageQueue(), invisibleTime, this.defaultMQPushConsumer.getPopBatchNums(),
popRequest.getConsumerGroup(), BROKER_SUSPEND_MAX_TIME_MILLIS, popCallback, true, popRequest.getInitMode(),
false, subscriptionData.getExpressionType(), subscriptionData.getSubString());
popRequest.getConsumerGroup(), BROKER_SUSPEND_MAX_TIME_MILLIS, popCallback, true, popRequest.getInitMode(),
false, subscriptionData.getExpressionType(), subscriptionData.getSubString());
} catch (Exception e) {
log.error("popAsync exception", e);
this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
......@@ -590,7 +597,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
List<MessageExt> msgFoundList = popResult.getMsgFoundList();
List<MessageExt> msgListFilterAgain = msgFoundList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()
&& popResult.getMsgFoundList().size() > 0) {
&& popResult.getMsgFoundList().size() > 0) {
msgListFilterAgain = new ArrayList<MessageExt>(popResult.getMsgFoundList().size());
for (MessageExt msg : popResult.getMsgFoundList()) {
if (msg.getTags() != null) {
......@@ -773,12 +780,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
void changePopInvisibleTimeAsync(String topic, String consumerGroup, String extraInfo, long invisibleTime, AckCallback callback)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs);
int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs);
FindBrokerResult
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
......@@ -1094,19 +1101,19 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// popInvisibleTime
if (this.defaultMQPushConsumer.getPopInvisibleTime() < MIN_POP_INVISIBLE_TIME
|| this.defaultMQPushConsumer.getPopInvisibleTime() > MAX_POP_INVISIBLE_TIME) {
|| this.defaultMQPushConsumer.getPopInvisibleTime() > MAX_POP_INVISIBLE_TIME) {
throw new MQClientException(
"popInvisibleTime Out of range [" + MIN_POP_INVISIBLE_TIME + ", " + MAX_POP_INVISIBLE_TIME + "]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
"popInvisibleTime Out of range [" + MIN_POP_INVISIBLE_TIME + ", " + MAX_POP_INVISIBLE_TIME + "]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// popBatchNums
if (this.defaultMQPushConsumer.getPopBatchNums() <= 0 || this.defaultMQPushConsumer.getPopBatchNums() > 32) {
throw new MQClientException(
"popBatchNums Out of range [1, 32]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
"popBatchNums Out of range [1, 32]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
......
......@@ -24,14 +24,14 @@ public class PullRequest implements MessageRequest {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
private long nextOffset;
private boolean lockedFirst = false;
private boolean previouslyLocked = false;
public boolean isLockedFirst() {
return lockedFirst;
public boolean isPreviouslyLocked() {
return previouslyLocked;
}
public void setLockedFirst(boolean lockedFirst) {
this.lockedFirst = lockedFirst;
public void setPreviouslyLocked(boolean previouslyLocked) {
this.previouslyLocked = previouslyLocked;
}
public String getConsumerGroup() {
......
......@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
......@@ -391,17 +392,7 @@ public abstract class RebalanceImpl {
}
private boolean getRebalanceResultFromBroker(final String topic, final boolean isOrder) {
String strategyName;
switch (messageModel) {
case BROADCASTING:
strategyName = null;
break;
case CLUSTERING:
strategyName = this.allocateMessageQueueStrategy.getName();
break;
default:
return true;
}
String strategyName = this.allocateMessageQueueStrategy.getName();
Set<MessageQueueAssignment> messageQueueAssignments;
try {
messageQueueAssignments = this.mQClientFactory.queryAssignment(topic, consumerGroup,
......@@ -422,9 +413,6 @@ public abstract class RebalanceImpl {
}
}
Set<MessageQueue> mqAll = null;
if (messageModel == MessageModel.BROADCASTING) {
mqAll = mqSet;
}
boolean changed = this.updateMessageQueueAssignment(topic, messageQueueAssignments, isOrder);
if (changed) {
log.info("broker rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, assignmentSet={}",
......@@ -705,7 +693,14 @@ public abstract class RebalanceImpl {
this.removeDirtyOffset(mq);
ProcessQueue pq = createProcessQueue();
pq.setLocked(true);
long nextOffset = this.computePullFromWhere(mq);
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
......@@ -774,8 +769,17 @@ public abstract class RebalanceImpl {
public abstract void removeDirtyOffset(final MessageQueue mq);
/**
* When the network is unstable, using this interface may return wrong offset.
* It is recommended to use computePullFromWhereWithException instead.
* @param mq
* @return offset
*/
@Deprecated
public abstract long computePullFromWhere(final MessageQueue mq);
public abstract long computePullFromWhereWithException(final MessageQueue mq) throws MQClientException;
public abstract int getConsumeInitMode();
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList, final long delay);
......
......@@ -74,8 +74,20 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
this.litePullConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Deprecated
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1L;
try {
result = computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.warn("Compute consume offset exception, mq={}", mq);
}
return result;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
long result = -1;
switch (consumeFromWhere) {
......@@ -118,7 +130,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
} else {
try {
......@@ -126,7 +139,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
}
} else {
......
......@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
......@@ -68,11 +69,17 @@ public class RebalancePullImpl extends RebalanceImpl {
this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Deprecated
@Override
public long computePullFromWhere(MessageQueue mq) {
return 0;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
return 0;
}
@Override
public int getConsumeInitMode() {
throw new UnsupportedOperationException("no initMode for Pull");
......
......@@ -115,7 +115,7 @@ public class RebalancePushImpl extends RebalanceImpl {
@Override
public boolean clientRebalance(String topic) {
// POPTODO order pop consume not implement yet
return defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || defaultMQPushConsumerImpl.isConsumeOrderly();
return defaultMQPushConsumerImpl.getDefaultMQPushConsumer().isClientRebalance() || defaultMQPushConsumerImpl.isConsumeOrderly() || MessageModel.BROADCASTING.equals(messageModel);
}
public boolean removeUnnecessaryPopMessageQueue(final MessageQueue mq, final PopProcessQueue pq) {
......@@ -149,8 +149,20 @@ public class RebalancePushImpl extends RebalanceImpl {
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
}
@Deprecated
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1L;
try {
result = computePullFromWhereWithException(mq);
} catch (MQClientException e) {
log.warn("Compute consume offset exception, mq={}", mq);
}
return result;
}
@Override
public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
......@@ -171,7 +183,8 @@ public class RebalancePushImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
}
} else {
......@@ -199,7 +212,8 @@ public class RebalancePushImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
} else {
try {
......@@ -207,7 +221,8 @@ public class RebalancePushImpl extends RebalanceImpl {
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
}
}
} else {
......
......@@ -85,7 +85,6 @@ public class TraceDataEncoder {
bean.setMsgId(line[5]);
bean.setRetryTimes(Integer.parseInt(line[6]));
bean.setKeys(line[7]);
bean.setClientHost(line[8]);
subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean);
resList.add(subBeforeContext);
......@@ -123,10 +122,9 @@ public class TraceDataEncoder {
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
bean.setClientHost(line[10]);
bean.setTransactionId(line[11]);
bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));
bean.setTransactionId(line[10]);
bean.setTransactionState(LocalTransactionState.valueOf(line[11]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[12]));
endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
endTransactionContext.getTraceBeans().add(bean);
......@@ -166,8 +164,7 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getOffsetMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);
.append(ctx.isSuccess()).append(TraceConstants.FIELD_SPLITOR);//
}
break;
case SubBefore: {
......@@ -179,8 +176,7 @@ public class TraceDataEncoder {
.append(ctx.getRequestId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getRetryTimes()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.FIELD_SPLITOR);//
.append(bean.getKeys()).append(TraceConstants.FIELD_SPLITOR);//
}
}
break;
......@@ -198,6 +194,7 @@ public class TraceDataEncoder {
}
}
break;
case EndTransaction: {
TraceBean bean = ctx.getTraceBeans().get(0);
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
......@@ -210,7 +207,6 @@ public class TraceDataEncoder {
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
......
......@@ -17,10 +17,10 @@
package org.apache.rocketmq.client.trace;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.Charsets;
import org.apache.rocketmq.common.message.MessageExt;
public class TraceView {
......@@ -38,8 +38,9 @@ public class TraceView {
private String groupName;
private String status;
public static List<TraceView> decodeFromTraceTransData(String key, String messageBody) {
public static List<TraceView> decodeFromTraceTransData(String key, MessageExt messageExt) {
List<TraceView> messageTraceViewList = new ArrayList<TraceView>();
String messageBody = new String(messageExt.getBody(), Charsets.UTF_8);
if (messageBody == null || messageBody.length() <= 0) {
return messageTraceViewList;
}
......@@ -56,8 +57,7 @@ public class TraceView {
messageTraceView.setGroupName(context.getGroupName());
if (context.isSuccess()) {
messageTraceView.setStatus("success");
}
else {
} else {
messageTraceView.setStatus("failed");
}
messageTraceView.setKeys(traceBean.getKeys());
......@@ -68,7 +68,7 @@ public class TraceView {
messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId());
messageTraceView.setTimeStamp(context.getTimeStamp());
messageTraceView.setStoreHost(traceBean.getStoreHost());
messageTraceView.setClientHost(traceBean.getClientHost());
messageTraceView.setClientHost(messageExt.getBornHostString());
messageTraceViewList.add(messageTraceView);
}
return messageTraceViewList;
......
......@@ -16,10 +16,10 @@
*/
package org.apache.rocketmq.client.trace.hook;
import java.util.Map;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceContext;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.TraceBean;
......@@ -74,7 +74,6 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
traceBean.setStoreTime(msg.getStoreTimestamp());//
traceBean.setBodyLength(msg.getStoreSize());//
traceBean.setRetryTimes(msg.getReconsumeTimes());//
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());
traceContext.setRegionId(regionId);//
beans.add(traceBean);
}
......@@ -93,7 +92,7 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();
if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it
// If subBefore bean is null ,skip it
return;
}
TraceContext subAfterContext = new TraceContext();
......@@ -103,13 +102,16 @@ public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {
subAfterContext.setRequestId(subBeforeContext.getRequestId());//
subAfterContext.setSuccess(context.isSuccess());//
// Caculate the cost time for processing messages
// Calculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
Map<String, String> props = context.getProps();
if (props != null) {
String contextType = props.get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}
}
localDispatcher.append(subAfterContext);
}
......
......@@ -60,7 +60,6 @@ public class SendMessageTraceHookImpl implements SendMessageHook {
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());
traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());
tuxeContext.getTraceBeans().add(traceBean);
}
......
......@@ -57,6 +57,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
......@@ -73,6 +74,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
......@@ -80,7 +82,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
......@@ -105,6 +107,7 @@ public class DefaultMQPushConsumerTest {
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
pushConsumer.setClientRebalance(false);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
......@@ -121,8 +124,10 @@ public class DefaultMQPushConsumerTest {
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
doReturn(null).when(mQClientFactory).queryAssignment(anyString(), anyString(), anyString(), any(MessageModel.class), anyInt());
rebalanceImpl = spy(pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl());
doReturn(123L).when(rebalanceImpl).computePullFromWhereWithException(any(MessageQueue.class));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalanceImpl);
......@@ -174,7 +179,6 @@ public class DefaultMQPushConsumerTest {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class));
}
@After
......@@ -286,7 +290,7 @@ public class DefaultMQPushConsumerTest {
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
ConsumeConcurrentlyContext context) {
assertThat(msgs.get(0).getBody()).isEqualTo(msgBody);
countDownLatch.countDown();
try {
......@@ -345,4 +349,21 @@ public class DefaultMQPushConsumerTest {
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
@Test
public void testPullMessage_ExceptionOccursWhenComputePullFromWhere() throws MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(
new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(),
(msgs, context) -> {
messageExts[0] = msgs.get(0);
return null;
}));
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeOrderly(true);
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
assertThat(messageExts[0]).isNull();
}
}
......@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
......@@ -58,6 +59,11 @@ public class RebalancePushImplTest {
private String topic = "TopicA";
private final String brokerName = "BrokerA";
@Before
public void before() {
defaultMQPushConsumer.getDefaultMQPushConsumer().setClientRebalance(false);
}
@Test
public void testMessageQueueChanged_CountThreshold() {
RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, MessageModel.CLUSTERING,
......
......@@ -21,7 +21,6 @@ import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
......@@ -75,6 +74,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.waitAtMost;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
......@@ -105,6 +105,27 @@ public class DefaultMQConsumerWithOpenTracingTest {
factoryTable.forEach((s, instance) -> instance.shutdown());
factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
......@@ -128,58 +149,20 @@ public class DefaultMQConsumerWithOpenTracingTest {
// suppress updateTopicRouteInfoFromNameServer
pushConsumer.changeInstanceNameToPID();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)));
mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
FieldUtils.writeDeclaredField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
mQClientFactory = spy(mQClientFactory);
factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
}
@After
......@@ -209,7 +192,8 @@ public class DefaultMQConsumerWithOpenTracingTest {
assertThat(msg.getTopic()).isEqualTo(topic);
assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
// wait until consumeMessageAfter hook of tracer is done surely.
waitAtMost(1, TimeUnit.SECONDS).until(() -> tracer.finishedSpans().size() == 1);
MockSpan span = tracer.finishedSpans().get(0);
assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQLitePullConsumerWithTraceTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
private MQAdminImpl mQAdminImpl;
private AsyncTraceDispatcher asyncTraceDispatcher;
private DefaultMQProducer traceProducer;
private RebalanceImpl rebalanceImpl;
private OffsetStore offsetStore;
private DefaultLitePullConsumerImpl litePullConsumerImpl;
private String consumerGroup = "LitePullConsumerGroup";
private String topic = "LitePullConsumerTest";
private String brokerName = "BrokerA";
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private String customerTraceTopic = "rmq_trace_topic_12345";
@Before
public void init() throws Exception {
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
field.setAccessible(true);
RebalanceService rebalanceService = (RebalanceService) field.get(mQClientFactory);
field = RebalanceService.class.getDeclaredField("waitInterval");
field.setAccessible(true);
field.set(rebalanceService, 100);
}
@Test
public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithDefaultTraceTopic();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
@Test
public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithCustomizedTraceTopic();
try {
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createMessageQueue());
litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
litePullConsumer.setPollTimeoutMillis(20 * 1000);
List<MessageExt> result = litePullConsumer.poll();
assertThat(result.get(0).getTopic()).isEqualTo(topic);
assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
} finally {
litePullConsumer.shutdown();
}
}
private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setEnableMsgTrace(true);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
}
private DefaultLitePullConsumer createLitePullConsumerWithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setEnableMsgTrace(true);
litePullConsumer.setCustomizedTraceTopic(customerTraceTopic);
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
litePullConsumer.subscribe(topic, "*");
suppressUpdateTopicRouteInfoFromNameServer(litePullConsumer);
litePullConsumer.start();
initDefaultLitePullConsumer(litePullConsumer);
return litePullConsumer;
}
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
asyncTraceDispatcher = (AsyncTraceDispatcher) litePullConsumer.getTraceDispatcher();
traceProducer = asyncTraceDispatcher.getTraceProducer();
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
litePullConsumerImpl = (DefaultLitePullConsumerImpl) field.get(litePullConsumer);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(litePullConsumerImpl, mQClientFactory);
PullAPIWrapper pullAPIWrapper = litePullConsumerImpl.getPullAPIWrapper();
field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pullAPIWrapper, mQClientFactory);
Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
fieldTrace.setAccessible(true);
fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQAdminImpl);
field = DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(rebalanceImpl, mQClientFactory);
offsetStore = spy(litePullConsumerImpl.getOffsetStore());
field = DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
field.setAccessible(true);
field.set(litePullConsumerImpl, offsetStore);
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
return pullResult;
}
});
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", false));
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(), anyString());
doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), any(ReadOffsetType.class));
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
private MessageQueue createMessageQueue() {
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
return messageQueue;
}
private TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
private static void suppressUpdateTopicRouteInfoFromNameServer(DefaultLitePullConsumer litePullConsumer) throws IllegalAccessException {
DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = (DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, "defaultLitePullConsumerImpl", true);
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
}
MQClientInstance mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, (RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", true)));
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
}
}
\ No newline at end of file
......@@ -18,7 +18,6 @@
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
import org.junit.Before;
......@@ -50,8 +49,7 @@ public class TraceDataEncoderTest {
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.append(true).append(TraceConstants.FIELD_SPLITOR)
.toString();
}
......@@ -104,7 +102,6 @@ public class TraceDataEncoderTest {
traceBean.setTags("Tags");
traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
traceBean.setStoreHost("127.0.0.1:10911");
traceBean.setClientHost("127.0.0.1@41700");
traceBean.setMsgType(MessageType.Trans_msg_Commit);
traceBean.setTransactionId("transactionId");
traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
......
......@@ -17,7 +17,8 @@
package org.apache.rocketmq.client.trace;
import org.apache.rocketmq.common.UtilAll;
import org.apache.commons.codec.Charsets;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageType;
import org.junit.Assert;
import org.junit.Test;
......@@ -29,29 +30,30 @@ public class TraceViewTest {
@Test
public void testDecodeFromTraceTransData() {
String messageBody = new StringBuilder()
.append("Pub").append(TraceConstants.CONTENT_SPLITOR)
.append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR)
.append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
.append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
.append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
.append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
.append("Tags").append(TraceConstants.CONTENT_SPLITOR)
.append("Keys").append(TraceConstants.CONTENT_SPLITOR)
.append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
.append(26).append(TraceConstants.CONTENT_SPLITOR)
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.CONTENT_SPLITOR)
.append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
.toString();
.append("Pub").append(TraceConstants.CONTENT_SPLITOR)
.append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR)
.append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
.append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
.append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
.append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
.append("Tags").append(TraceConstants.CONTENT_SPLITOR)
.append("Keys").append(TraceConstants.CONTENT_SPLITOR)
.append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
.append(26).append(TraceConstants.CONTENT_SPLITOR)
.append(245).append(TraceConstants.CONTENT_SPLITOR)
.append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
.append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
.append(true).append(TraceConstants.FIELD_SPLITOR)
.toString();
MessageExt message = new MessageExt();
message.setBody(messageBody.getBytes(Charsets.UTF_8));
String key = "AC1415116D1418B4AAC217FE1B4E0000";
List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, message);
Assert.assertEquals(traceViews.size(), 1);
Assert.assertEquals(traceViews.get(0).getMsgId(), key);
key = "AD4233434334AAC217FEFFD0000";
traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
traceViews = TraceView.decodeFromTraceTransData(key, message);
Assert.assertEquals(traceViews.size(), 0);
}
}
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion {
public static final int CURRENT_VERSION = Version.V4_8_0.ordinal();
public static final int CURRENT_VERSION = Version.V4_9_0.ordinal();
public static String getVersionDesc(int value) {
int length = Version.values().length;
......
......@@ -20,10 +20,10 @@ public class PermName {
public static final int PERM_PRIORITY = 0x1 << 3;
public static final int PERM_READ = 0x1 << 2;
public static final int PERM_WRITE = 0x1 << 1;
public static final int PERM_INHERIT = 0x1 << 0;
public static final int PERM_INHERIT = 0x1;
public static String perm2String(final int perm) {
final StringBuffer sb = new StringBuffer("---");
final StringBuilder sb = new StringBuilder("---");
if (isReadable(perm)) {
sb.replace(0, 1, "R");
}
......
......@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.hook;
import java.nio.ByteBuffer;
public interface FilterCheckHook {
public String hookName();
String hookName();
public boolean isFilterMatched(final boolean isUnitMode, final ByteBuffer byteBuffer);
boolean isFilterMatched(final boolean isUnitMode, final ByteBuffer byteBuffer);
}
#!/bin/sh
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
......@@ -20,25 +20,25 @@ CURRENT_DIR="$(cd "$(dirname "$0")"; pwd)"
RMQ_DIR=$CURRENT_DIR/../..
cd $RMQ_DIR
function startNameserver() {
startNameserver() {
export JAVA_OPT_EXT=" -Xms512m -Xmx512m "
nohup bin/mqnamesrv &
}
function startBroker() {
startBroker() {
export JAVA_OPT_EXT=" -Xms1g -Xmx1g "
conf_name=$1
nohup bin/mqbroker -c $conf_name &
}
function stopNameserver() {
stopNameserver() {
PIDS=$(ps -ef|grep java|grep NamesrvStartup|grep -v grep|awk '{print $2}')
if [ ! -z "$PIDS" ]; then
kill -s TERM $PIDS
fi
}
function stopBroker() {
stopBroker() {
conf_name=$1
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
i=1
......@@ -46,7 +46,7 @@ function stopBroker() {
do
echo "Waiting to kill ..."
kill -s TERM $PIDS
((i=$i+1))
i=`expr $i + 1`
sleep 2
PIDS=$(ps -ef|grep java|grep BrokerStartup|grep $conf_name|grep -v grep|awk '{print $2}')
done
......@@ -56,7 +56,7 @@ function stopBroker() {
fi
}
function stopAll() {
stopAll() {
ps -ef|grep java|grep BrokerStartup|grep -v grep|awk '{print $2}'|xargs kill
stopNameserver
stopBroker ./conf/dledger/broker-n0.conf
......@@ -64,18 +64,18 @@ function stopAll() {
stopBroker ./conf/dledger/broker-n2.conf
}
function startAll() {
startAll() {
startNameserver
startBroker ./conf/dledger/broker-n0.conf
startBroker ./conf/dledger/broker-n1.conf
startBroker ./conf/dledger/broker-n2.conf
}
function checkConf() {
checkConf() {
if [ ! -f ./conf/dledger/broker-n0.conf -o ! -f ./conf/dledger/broker-n1.conf -o ! -f ./conf/dledger/broker-n2.conf ]; then
echo "Make sure the ./conf/dledger/broker-n0.conf, ./conf/dledger/broker-n1.conf, ./conf/dledger/broker-n2.conf exists"
exit -1
fi
exit 1
fi
}
......@@ -83,7 +83,7 @@ function checkConf() {
## Main
if [ $# -lt 1 ]; then
echo "Usage: sh $0 start|stop"
exit -1
exit 1
fi
action=$1
checkConf
......
......@@ -36,7 +36,7 @@ set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
set "JAVA_OPT=%JAVA_OPT% -XX:+AlwaysPreTouch"
set "JAVA_OPT=%JAVA_OPT% -XX:MaxDirectMemorySize=15g"
set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages -XX:-UseBiasedLocking"
set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs=%BASE_DIR%lib"
set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs=%BASE_DIR%lib;%JAVA_HOME%\jre\lib\ext"
set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
"%JAVA%" %JAVA_OPT% %*
\ No newline at end of file
......@@ -31,7 +31,7 @@ set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollect
set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails"
set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow"
set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages"
set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs=%BASE_DIR%lib"
set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs=%BASE_DIR%lib;%JAVA_HOME%\jre\lib\ext"
set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
"%JAVA%" %JAVA_OPT% %*
\ No newline at end of file
......@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
......
......@@ -446,7 +446,7 @@ public class ScheduledMessageConsumer {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
......
......@@ -58,7 +58,7 @@ code |int | 请求操作码,应答方根据不同的请求码进行不同的
language | LanguageCode | 请求方实现的语言 | 应答方实现的语言
version | int | 请求方程序的版本 | 应答方程序的版本
opaque | int |相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应 | 应答不做修改直接返回
flag | int | 区分是普通RPC还是onewayRPC得标志 | 区分是普通RPC还是onewayRPC得标志
flag | int | 区分是普通RPC还是onewayRPC的标志 | 区分是普通RPC还是onewayRPC的标志
remark | String | 传输自定义文本信息 | 传输自定义文本信息
extFields | HashMap<String, String> | 请求自定义扩展信息 | 响应自定义扩展信息
......
......@@ -26,7 +26,7 @@ The Name Server boot success...
$ nohup sh bin/mqbroker -n localhost:9876 &
### 验证Name Server 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a, 192.169.1.2:10911] boot success...
```
......
## Load Balancing
Load balancing in RocketMQ is accomplished on Client side. Specifically, it can be divided into load balancing at Producer side when sending messages and load balancing at Constumer side when subscribing messages.
### 1 Producer Load Balancing
### Producer Load Balancing
When the Producer sends a message, it will first find the specified TopicPublishInfo according to Topic. After getting the routing information of TopicPublishInfo, the RocketMQ client will select a queue (MessageQueue) from the messageQueue List in TopicPublishInfo to send the message by default.Specific fault-tolerant strategies are defined in the MQFaultStrategy class.
Here is a sendLatencyFaultEnable switch variable, which, if turned on, filters out the Broker agent of not available on the basis of randomly gradually increasing modular arithmetic selection. The so-called "latencyFault Tolerance" refers to a certain period of time to avoid previous failures. For example, if the latency of the last request exceeds 550 Lms, it will evade 3000 Lms; if it exceeds 1000L, it will evade 60000 L; if it is closed, it will choose a queue (MessageQueue) to send messages by randomly gradually increasing modular arithmetic, and the latencyFault Tolerance mechanism is the key to achieve high availability of message sending.
### 2 Consumer Load Balancing
### Consumer Load Balancing
In RocketMQ, the two consumption modes (Push/Pull) on the Consumer side are both based on the pull mode to get the message, while in the Push mode it is only a kind of encapsulation of the pull mode, which is essentially implemented as the message pulling thread after pulling a batch of messages from the server. After submitting to the message consuming thread pool, it continues to try again to pull the message to the server. If the message is not pulled, the pull is delayed and continues. In both pull mode based consumption patterns (Push/Pull), the Consumer needs to know which message queue - queue from the Broker side to get the message. Therefore, it is necessary to do load balancing on the Consumer side, that is, which Consumer consumption is allocated to the same ConsumerGroup by more than one MessageQueue on the Broker side.
1, Heartbeat Packet Sending on Consumer side
#### 1 Heartbeat Packet Sending on Consumer side
After Consumer is started, it continuously sends heartbeat packets to all Broker instances in the RocketMQ cluster via timing task (which contains the message consumption group name, subscription relationship collection,Message communication mode and the value of the client id,etc). After receiving the heartbeat message from Consumer, Broker side maintains it in Consumer Manager's local caching variable—consumerTable, At the same time, the encapsulated client network channel information is stored in the local caching variable—channelInfoTable, which can provide metadata information for the later load balancing of Consumer.
2,Core Class for Load Balancing on Consumer side—RebalanceImpl
#### 2 Core Class for Load Balancing on Consumer side—RebalanceImpl
Starting the MQClientInstance instance in the startup process of the Consumer instance will complete the start of the load balancing service thread-RebalanceService (executed every 20 s). By looking at the source code, we can find that the run () method of the RebalanceService thread calls the rebalanceByTopic () method of the RebalanceImpl class, which is the core of the Consumer end load balancing. Here, rebalanceByTopic () method will do different logical processing depending on whether the consumer communication type is "broadcast mode" or "cluster mode". Here we mainly look at the main processing flow in cluster mode:
(1) Get the message consumption queue set (mqSet) under the Topic from the local cache variable—topicSubscribeInfoTable of the rebalanceImpl instance.
(2) Call mQClientFactory. findConsumerIdList () method to send a RPC communication request to Broker side to obtain the consumer Id list under the consumer group based on the parameters of topic and consumer group (consumer table constructed by Broker side based on the heartbeat data reported by the front consumer side responds and returns, business request code: GET_CONSUMER_LIST_BY_GROUP);
(3) First, the message consumption queue and the consumer Id under Topic are sorted, then the message queue to be pulled is calculated by using the message queue allocation strategy algorithm (default: the average allocation algorithm of the message queue). The average allocation algorithm here is similar to the paging algorithm. It ranks all MessageQueues like records. It ranks all consumers like pages. It calculates the average size of each page and the range of each page record. Finally, it traverses the whole range and calculates the records that the current consumer should allocate to (MessageQueue here).
##### 1) Get the message consumption queue set (mqSet) under the Topic from the local cache variable—topicSubscribeInfoTable of the rebalanceImpl instance.
##### 2) Call mQClientFactory. findConsumerIdList () method to send a RPC communication request to Broker side to obtain the consumer Id list under the consumer group based on the parameters of topic and consumer group (consumer table constructed by Broker side based on the heartbeat data reported by the front consumer side responds and returns, business request code: GET_CONSUMER_LIST_BY_GROUP);
##### 3) First, the message consumption queue and the consumer Id under Topic are sorted, then the message queue to be pulled is calculated by using the message queue allocation strategy algorithm (default: the average allocation algorithm of the message queue). The average allocation algorithm here is similar to the paging algorithm. It ranks all MessageQueues like records. It ranks all consumers like pages. It calculates the average size of each page and the range of each page record. Finally, it traverses the whole range and calculates the records that the current consumer should allocate to (MessageQueue here).
![Image text](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_8.png)
(4) Then, the updateProcessQueueTableInRebalance () method is invoked, which first compares the allocated message queue set (mqSet) with processQueueTable for filtering.
##### 4) Then, the updateProcessQueueTableInRebalance () method is invoked, which first compares the allocated message queue set (mqSet) with processQueueTable for filtering.
![Image text](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_9.png)
- The red part of the processQueueTable annotation in the figure above
......@@ -37,6 +39,6 @@ Starting the MQClientInstance instance in the startup process of the Consumer in
removeUnnecessaryMessageQueue () method to try to remove Entry as
above;
Finally, a ProcessQueue object is created for each MessageQueue in the filtered message queue set (mqSet) and stored in the processQueueTable queue of RebalanceImpl (where the computePullFromWhere (MessageQueue mq) method of the RebalanceImpl instance is invoked to obtain the next progress consumption value offset of the MessageQueue object, which is then populated into the attribute of pullRequest object to be created next time.), and create pull request object—pullRequest to add to pull list—pullRequestList, and finally execute dispatchPullRequest () method. PullRequest object of Pull message is put into the blocking queue pullRequestQueue of PullMessageService service thread in turn, and the request of Pull message is sent to Broker end after the service thread takes out. Among them, we can focus on the contrast, RebalancePushImpl and RebalancePullImpl two implementation classes dispatchPullRequest () method is different, the method in RebalancePullImpl class is empty, thus answering the last question in the previous article.
Finally, a ProcessQueue object is created for each MessageQueue in the filtered message queue set (mqSet) and stored in the processQueueTable queue of RebalanceImpl (where the computePullFromWhere (MessageQueue mq) method of the RebalanceImpl instance is invoked to obtain the next progress consumption value offset of the MessageQueue object, which is then populated into the attribute of pullRequest object to be created next time.), and create pull request object—pullRequest to add to pull list—pullRequestList, and finally execute dispatchPullRequest () method. PullRequest object of Pull message is put into the blocking queue pullRequestQueue of PullMessageService service thread in turn, and the request of Pull message is sent to Broker end after the service thread takes out.
The core design idea of message consumption queue is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.
......@@ -19,7 +19,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -54,7 +54,7 @@ public class Consumer {
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20;
final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "false";
final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null;
final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
......@@ -190,7 +190,7 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false");
opt = new Option("p", "group suffix enable", true, "Consumer group suffix enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
......
......@@ -17,12 +17,14 @@
package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
......@@ -58,9 +60,10 @@ public class Producer {
final int tagCount = commandLine.hasOption('l') ? Integer.parseInt(commandLine.getOptionValue('l')) : 0;
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
final long messageNum = commandLine.hasOption('q') ? Long.parseLong(commandLine.getOptionValue('q')) : 0;
System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable);
System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum);
final InternalLogger log = ClientLogger.getLog();
......@@ -72,6 +75,16 @@ public class Producer {
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
final long[] msgNums = new long[threadCount];
if (messageNum > 0) {
Arrays.fill(msgNums, messageNum / threadCount);
long mod = messageNum % threadCount;
if (mod > 0) {
msgNums[0] += mod;
}
}
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
......@@ -85,14 +98,7 @@ public class Producer {
timer.scheduleAtFixedRate(new TimerTask() {
private void printStats() {
if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
doPrintStats(snapshotList, statsBenchmark, false);
}
}
......@@ -120,9 +126,14 @@ public class Producer {
producer.start();
for (int i = 0; i < threadCount; i++) {
final long msgNumLimit = msgNums[i];
if (messageNum > 0 && msgNumLimit == 0) {
break;
}
sendThreadPool.execute(new Runnable() {
@Override
public void run() {
int num = 0;
while (true) {
try {
final Message msg;
......@@ -198,10 +209,28 @@ public class Producer {
} catch (InterruptedException ignored) {
}
}
if (messageNum > 0 && ++num >= msgNumLimit) {
break;
}
}
}
});
}
try {
sendThreadPool.shutdown();
sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
timer.cancel();
if (snapshotList.size() > 1) {
doPrintStats(snapshotList, statsBenchmark, true);
} else {
System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n",
statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get());
}
producer.shutdown();
} catch (InterruptedException e) {
log.error("[Exit] Thread Interrupted Exception", e);
}
}
public static Options buildCommandlineOptions(final Options options) {
......@@ -233,6 +262,10 @@ public class Producer {
opt.setRequired(false);
options.addOption(opt);
opt = new Option("q", "messageQuantity", true, "Send message quantity, Default: 0, running forever");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......@@ -249,6 +282,23 @@ public class Producer {
return msg;
}
private static void doPrintStats(final LinkedList<Long[]> snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
if (done) {
System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
} else {
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
}
}
}
class StatsBenchmarkProducer {
......
......@@ -55,6 +55,7 @@ public class PopPushConsumer {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.setClientRebalance(false);
consumer.start();
System.out.printf("Consumer Started.%n");
}
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -36,8 +36,8 @@ public class SimpleCharStream {
* Position in buffer.
*/
public int bufpos = -1;
protected int bufline[];
protected int bufcolumn[];
protected int[] bufline;
protected int[] bufcolumn;
protected int column = 0;
protected int line = 1;
......@@ -62,8 +62,8 @@ public class SimpleCharStream {
protected void ExpandBuff(boolean wrapAround) {
char[] newbuffer = new char[bufsize + 2048];
int newbufline[] = new int[bufsize + 2048];
int newbufcolumn[] = new int[bufsize + 2048];
int[] newbufline = new int[bufsize + 2048];
int[] newbufcolumn = new int[bufsize + 2048];
try {
if (wrapAround) {
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -116,7 +116,6 @@ public class Level implements Serializable {
if (s.equals(OFF_NAME)) {
return Level.OFF;
}
if (s.equals(INFO_NAME)) {
return Level.INFO;
}
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -140,6 +140,11 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
final PutKVConfigRequestHeader requestHeader =
(PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
if (requestHeader.getNamespace() == null || requestHeader.getKey() == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("namespace or key is null");
return response;
}
this.namesrvController.getKvConfigManager().putKVConfig(
requestHeader.getNamespace(),
requestHeader.getKey(),
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -29,7 +29,7 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
......@@ -439,6 +439,12 @@
<version>3.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
......@@ -536,7 +542,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.69</version>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,6 +19,7 @@ package org.apache.rocketmq.remoting.common;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.Inet6Address;
......@@ -98,6 +99,10 @@ public class RemotingUtil {
ArrayList<String> ipv6Result = new ArrayList<String>();
while (enumeration.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration.nextElement();
if (isBridge(networkInterface)) {
continue;
}
final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
while (en.hasMoreElements()) {
final InetAddress address = en.nextElement();
......@@ -160,6 +165,19 @@ public class RemotingUtil {
return sb.toString();
}
private static boolean isBridge(NetworkInterface networkInterface) {
try {
if (isLinuxPlatform()) {
String interfaceName = networkInterface.getName();
File file = new File("/sys/class/net/" + interfaceName + "/bridge");
return file.exists();
}
} catch (SecurityException e) {
//Ignore
}
return false;
}
public static SocketChannel connect(SocketAddress remote) {
return connect(remote, 1000 * 5);
}
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -152,6 +152,8 @@ public class MessageStoreConfig {
private boolean isEnableBatchPush = false;
private boolean enableScheduleMessageStats = true;
public boolean isDebugLockEnable() {
return debugLockEnable;
}
......@@ -722,4 +724,12 @@ public class MessageStoreConfig {
public void setEnableBatchPush(boolean enableBatchPush) {
isEnableBatchPush = enableBatchPush;
}
public boolean isEnableScheduleMessageStats() {
return enableScheduleMessageStats;
}
public void setEnableScheduleMessageStats(boolean enableScheduleMessageStats) {
this.enableScheduleMessageStats = enableScheduleMessageStats;
}
}
......@@ -426,17 +426,18 @@ public class DLedgerCommitLog extends CommitLog {
AppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
long elapsedTimeInLock;
long queueOffset;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(msg);
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status));
}
encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
......@@ -542,6 +543,12 @@ public class DLedgerCommitLog extends CommitLog {
BatchAppendFuture<AppendEntryResponse> dledgerFuture;
EncodeResult encodeResult;
encodeResult = this.messageSerializer.serialize(messageExtBatch);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
msgIdBuilder.setLength(0);
long elapsedTimeInLock;
......@@ -549,12 +556,8 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(messageExtBatch);
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult
.status));
}
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
......@@ -664,7 +667,7 @@ public class DLedgerCommitLog extends CommitLog {
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset);
encodeResult.setQueueOffsetKey(queueOffset, false);
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
......@@ -779,7 +782,8 @@ public class DLedgerCommitLog extends CommitLog {
long msgNum = 0;
try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
encodeResult.setQueueOffsetKey(queueOffset, true);
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
......@@ -957,8 +961,15 @@ public class DLedgerCommitLog extends CommitLog {
this.queueOffsetKey = queueOffsetKey;
}
public void setQueueOffsetKey(long offset) {
data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
public void setQueueOffsetKey(long offset, boolean isBatch) {
if (!isBatch) {
this.data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
return;
}
for (byte[] data : batchData) {
ByteBuffer.wrap(data).putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset++);
}
}
public byte[] getData() {
......@@ -977,8 +988,6 @@ public class DLedgerCommitLog extends CommitLog {
// The maximum length of the message
private final int maxMessageSize;
// Build Message Key
private final StringBuilder keyBuilder = new StringBuilder();
MessageSerializer(final int size) {
this.maxMessageSize = size;
......@@ -1079,17 +1088,7 @@ public class DLedgerCommitLog extends CommitLog {
}
public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
keyBuilder.setLength(0);
keyBuilder.append(messageExtBatch.getTopic());
keyBuilder.append('-');
keyBuilder.append(messageExtBatch.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
}
String key = messageExtBatch.getTopic() + "-" + messageExtBatch.getQueueId();
int totalMsgLen = 0;
ByteBuffer messagesByteBuff = messageExtBatch.wrap();
......@@ -1154,7 +1153,7 @@ public class DLedgerCommitLog extends CommitLog {
// 5 FLAG
msgStoreItemMemory.putInt(flag);
// 6 QUEUEOFFSET
msgStoreItemMemory.putLong(queueOffset++);
msgStoreItemMemory.putLong(0L);
// 7 PHYSICALOFFSET
msgStoreItemMemory.putLong(0);
// 8 SYSFLAG
......@@ -1210,6 +1209,7 @@ public class DLedgerCommitLog extends CommitLog {
this.sbr = sbr;
}
@Override
public synchronized void release() {
super.release();
if (sbr != null) {
......
......@@ -115,6 +115,7 @@ public class ScheduleMessageService extends ConfigManager {
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
......@@ -330,6 +331,12 @@ public class ScheduleMessageService extends ConfigManager {
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
}
continue;
} else {
// XXX: warn and notify me
......
......@@ -40,6 +40,9 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
......@@ -112,6 +115,10 @@ public class ScheduleMessageServiceTest {
@Test
public void deliverDelayedMessageTimerTaskTest() throws Exception {
assertThat(messageStore.getMessageStoreConfig().isEnableScheduleMessageStats()).isTrue();
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic)).isNull();
MessageExtBrokerInner msg = buildMessage();
int realQueueId = msg.getQueueId();
// set delayLevel,and send delay message
......@@ -141,6 +148,10 @@ public class ScheduleMessageServiceTest {
// now,found the message
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
// get the stats change
assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().get()).isEqualTo(1);
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().get()).isEqualTo(1L);
assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().get()).isEqualTo(messageResult.getBufferTotalSize());
// get the message body
ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.test.client.rmq;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQPopConsumer extends RMQNormalConsumer {
public RMQPopConsumer(String nsAddr, String topic, String subExpression,
String consumerGroup, AbstractListener listner) {
super(nsAddr, topic, subExpression, consumerGroup, listner);
}
@Override
public void create() {
super.create();
consumer.setClientRebalance(false);
}
}
......@@ -22,6 +22,7 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.listener.AbstractListener;
......@@ -62,6 +63,15 @@ public class ConsumerFactory {
consumer.start();
return consumer;
}
public static RMQPopConsumer getRMQPopConsumer(String nsAddr, String consumerGroup,
String topic, String subExpression,
AbstractListener listener) {
RMQPopConsumer consumer = new RMQPopConsumer(nsAddr, topic, subExpression,
consumerGroup, listener);
consumer.create();
consumer.start();
return consumer;
}
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, String consumerGroup) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
......
......@@ -33,7 +33,7 @@ public class FileUtil {
this.fileName = fileName;
}
public static void main(String args[]) {
public static void main(String[] args) {
String filePath = FileUtil.class.getResource("/").getPath();
String fileName = "test.txt";
FileUtil fileUtil = new FileUtil(filePath, fileName);
......
......@@ -76,7 +76,7 @@ public class MQWait {
}
}
public static void main(String args[]) {
public static void main(String[] args) {
long start = System.currentTimeMillis();
MQWait.setCondition(new Condition() {
......
......@@ -100,14 +100,14 @@ public final class RandomUtil {
return n + res % (m - n);
}
private static char getChar(int arg[]) {
private static char getChar(int[] arg) {
int size = arg.length;
int c = rd.nextInt(size / 2);
c = c * 2;
return (char) (getIntegerBetween(arg[c], arg[c + 1]));
}
private static String getString(int n, int arg[]) {
private static String getString(int n, int[] arg) {
StringBuilder res = new StringBuilder();
for (int i = 0; i < n; i++) {
res.append(getChar(arg));
......@@ -116,17 +116,17 @@ public final class RandomUtil {
}
public static String getStringWithCharacter(int n) {
int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
int[] arg = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
return getString(n, arg);
}
public static String getStringWithNumber(int n) {
int arg[] = new int[] {'0', '9' + 1};
int[] arg = new int[] {'0', '9' + 1};
return getString(n, arg);
}
public static String getStringWithNumAndCha(int n) {
int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1, '0', '9' + 1};
int[] arg = new int[] {'a', 'z' + 1, 'A', 'Z' + 1, '0', '9' + 1};
return getString(n, arg);
}
......
......@@ -45,16 +45,16 @@ public class RandomUtils {
}
public static String getStringWithNumber(int n) {
int arg[] = new int[] {'0', '9' + 1};
int[] arg = new int[] {'0', '9' + 1};
return getString(n, arg);
}
public static String getStringWithCharacter(int n) {
int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
int[] arg = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
return getString(n, arg);
}
private static String getString(int n, int arg[]) {
private static String getString(int n, int[] arg) {
StringBuilder res = new StringBuilder();
for (int i = 0; i < n; i++) {
res.append(getChar(arg));
......@@ -62,7 +62,7 @@ public class RandomUtils {
return res.toString();
}
private static char getChar(int arg[]) {
private static char getChar(int[] arg) {
int size = arg.length;
int c = rd.nextInt(size / 2);
c = c * 2;
......
......@@ -140,7 +140,7 @@ public class VerifyUtils {
return rtExpect;
}
public static void main(String args[]) {
public static void main(String[] args) {
verifyBalance(400, 0.1f, 230, 190);
}
}
......@@ -43,7 +43,7 @@ public class ListDataCollectorImpl implements DataCollector {
return datas;
}
public void resetData() {
public synchronized void resetData() {
datas.clear();
unlockIncrement();
}
......@@ -67,7 +67,7 @@ public class ListDataCollectorImpl implements DataCollector {
return Collections.frequency(datas, data) == 1;
}
public Collection<Object> getAllDataWithoutDuplicate() {
public synchronized Collection<Object> getAllDataWithoutDuplicate() {
return new HashSet<Object>(datas);
}
......@@ -81,7 +81,7 @@ public class ListDataCollectorImpl implements DataCollector {
return res;
}
public void removeData(Object data) {
public synchronized void removeData(Object data) {
datas.remove(data);
}
......
......@@ -133,6 +133,7 @@ public class IntegrationTestBase {
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
brokerConfig.setLoadBalancePollNameServerInterval(500);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.test.client.consumer.balance;
import org.apache.log4j.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
......@@ -29,11 +28,13 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.truth.Truth.assertThat;
public class NormalMsgStaticBalanceIT extends BaseConf {
private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class);
private static Logger logger = LoggerFactory.getLogger(NormalMsgStaticBalanceIT.class);
private RMQNormalProducer producer = null;
private String topic = null;
......@@ -75,13 +76,12 @@ public class NormalMsgStaticBalanceIT extends BaseConf {
@Test
public void testFourConsumersBalance() {
int msgSize = 600;
RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListener());
RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListener());
String consumerGroup = initConsumerGroup();
logger.info("use group: {}", consumerGroup);
RMQNormalConsumer consumer1 = getConsumer(nsAddr, consumerGroup, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumerGroup, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumerGroup, topic, "*", new RMQNormalListener());
RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumerGroup, topic, "*", new RMQNormalListener());
TestUtils.waitForSeconds(waitTime);
producer.send(msgSize);
......
......@@ -22,8 +22,9 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.logging.inner.Logger;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.test.util.RandomUtil;
import org.apache.rocketmq.test.util.VerifyUtils;
......@@ -64,12 +65,14 @@ public class PopSubCheckIT extends BaseConf {
RMQNormalProducer producer = getProducer(nsAddr, topic);
producer.getProducer().setCompressMsgBodyOverHowmuch(Integer.MAX_VALUE);
RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
for (String brokerAddr : new String[]{brokerController1.getBrokerAddr(), brokerController2.getBrokerAddr()}) {
defaultMQAdminExt.setMessageRequestMode(brokerAddr, topic, group, MessageRequestMode.POP, 8, 60_000);
}
RMQPopConsumer consumer = ConsumerFactory.getRMQPopConsumer(nsAddr, group,
topic, "*", new RMQNormalListener());
mqClients.add(consumer);
int msgNum = 1;
producer.send(msgNum);
Assert.assertEquals("Not all sent succeeded", msgNum, producer.getAllUndupMsgBody().size());
......@@ -80,7 +83,7 @@ public class PopSubCheckIT extends BaseConf {
.containsExactlyElementsIn(producer.getAllMsgBody());
for (Object o : consumer.getListener().getAllOriginMsg()) {
MessageClientExt msg = (MessageClientExt) o;
assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).isNotEmpty();
assertThat(msg.getProperty(MessageConst.PROPERTY_POP_CK)).named("check pop meta").isNotEmpty();
}
consumer.getListener().waitForMessageConsume(msgNum, 3_000 * 9);
......
......@@ -92,7 +92,7 @@ public class MulTagSubIT extends BaseConf {
@Test
public void testSubTwoTabAndMatchTwo() {
String tags[] = {"jueyin1", "jueyin2"};
String[] tags = {"jueyin1", "jueyin2"};
String subExpress = String.format("%s||%s", tags[0], tags[1]);
int msgSize = 10;
......@@ -113,7 +113,7 @@ public class MulTagSubIT extends BaseConf {
@Test
public void testSubThreeTabAndMatchTwo() {
String tags[] = {"jueyin1", "jueyin2", "jueyin3"};
String[] tags = {"jueyin1", "jueyin2", "jueyin3"};
String subExpress = String.format("%s||%s", tags[0], tags[1]);
int msgSize = 10;
......@@ -135,7 +135,7 @@ public class MulTagSubIT extends BaseConf {
@Test
public void testNoMatch() {
String tags[] = {"jueyin1", "jueyin2", "jueyin3"};
String[] tags = {"jueyin1", "jueyin2", "jueyin3"};
String subExpress = "no_match";
int msgSize = 10;
......
......@@ -84,7 +84,7 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
@Test
public void testSendMessagesWithTwoTag() {
String tags[] = {"jueyin1", "jueyin2"};
String[] tags = {"jueyin1", "jueyin2"};
int msgSize = 10;
TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
......@@ -113,7 +113,7 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
@Test
public void testTwoConsumerOneMatchOneOtherMatchAll() {
String tags[] = {"jueyin1", "jueyin2"};
String[] tags = {"jueyin1", "jueyin2"};
String sub1 = String.format("%s||%s", tags[0], tags[1]);
String sub2 = String.format("%s|| noExist", tags[0]);
int msgSize = 10;
......@@ -144,7 +144,7 @@ public class TagMessageWithMulConsumerIT extends BaseConf {
@Test
public void testSubKindsOf() {
String tags[] = {"jueyin1", "jueyin2"};
String[] tags = {"jueyin1", "jueyin2"};
String sub1 = String.format("%s||%s", tags[0], tags[1]);
String sub2 = String.format("%s|| noExist", tags[0]);
String sub3 = tags[0];
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.9.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -129,12 +129,18 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
}
public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end);
}
@Override
public void start() throws MQClientException {
defaultMQAdminExtImpl.start();
......
......@@ -999,12 +999,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
long end) throws MQClientException,
InterruptedException {
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin,
long end) throws MQClientException, InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessageByUniqKey(topic, key, maxNum, begin, end);
}
@Override
public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
long offset) throws RemotingException, InterruptedException, MQBrokerException {
......
......@@ -38,7 +38,7 @@ import org.apache.rocketmq.tools.command.SubCommandException;
public class CLusterSendMsgRTCommand implements SubCommand {
public static void main(String args[]) {
public static void main(String[] args) {
}
@Override
......
......@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
......@@ -57,82 +58,43 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
}
}
public static void queryById(final DefaultMQAdminExt admin, final String topic,
final String msgId) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException, IOException {
MessageExt msg = admin.viewMessage(topic, msgId);
String bodyTmpFilePath = createBodyFile(msg);
System.out.printf("%-20s %s%n",
"Topic:",
msg.getTopic()
);
System.out.printf("%-20s %s%n",
"Tags:",
"[" + msg.getTags() + "]"
);
System.out.printf("%-20s %s%n",
"Keys:",
"[" + msg.getKeys() + "]"
);
System.out.printf("%-20s %d%n",
"Queue ID:",
msg.getQueueId()
);
System.out.printf("%-20s %d%n",
"Queue Offset:",
msg.getQueueOffset()
);
System.out.printf("%-20s %d%n",
"CommitLog Offset:",
msg.getCommitLogOffset()
);
System.out.printf("%-20s %d%n",
"Reconsume Times:",
msg.getReconsumeTimes()
);
System.out.printf("%-20s %s%n",
"Born Timestamp:",
UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())
);
System.out.printf("%-20s %s%n",
"Store Timestamp:",
UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())
);
System.out.printf("%-20s %s%n",
"Born Host:",
RemotingHelper.parseSocketAddressAddr(msg.getBornHost())
);
System.out.printf("%-20s %s%n",
"Store Host:",
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())
);
System.out.printf("%-20s %d%n",
"System Flag:",
msg.getSysFlag()
);
System.out.printf("%-20s %s%n",
"Properties:",
msg.getProperties() != null ? msg.getProperties().toString() : ""
);
System.out.printf("%-20s %s%n",
"Message Body Path:",
bodyTmpFilePath
);
public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId,
final boolean showAll) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException, IOException {
QueryResult queryResult = admin.queryMessageByUniqKey(topic, msgId, 32, 0, Long.MAX_VALUE);
assert queryResult != null;
List<MessageExt> list = queryResult.getMessageList();
if (list == null || list.size() == 0) {
return;
}
list.sort((o1, o2) -> (int) (o1.getStoreTimestamp() - o2.getStoreTimestamp()));
for (int i = 0; i < (showAll ? list.size() : 1); i++) {
showMessage(admin, list.get(i), i);
}
}
private static void showMessage(final DefaultMQAdminExt admin, MessageExt msg, int index) throws IOException {
String bodyTmpFilePath = createBodyFile(msg, index);
final String strFormat = "%-20s %s%n";
final String intFormat = "%-20s %d%n";
System.out.printf(strFormat, "Topic:", msg.getTopic());
System.out.printf(strFormat, "Tags:", "[" + msg.getTags() + "]");
System.out.printf(strFormat, "Keys:", "[" + msg.getKeys() + "]");
System.out.printf(intFormat, "Queue ID:", msg.getQueueId());
System.out.printf(intFormat, "Queue Offset:", msg.getQueueOffset());
System.out.printf(intFormat, "CommitLog Offset:", msg.getCommitLogOffset());
System.out.printf(intFormat, "Reconsume Times:", msg.getReconsumeTimes());
System.out.printf(strFormat, "Born Timestamp:", UtilAll.timeMillisToHumanString2(msg.getBornTimestamp()));
System.out.printf(strFormat, "Store Timestamp:", UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp()));
System.out.printf(strFormat, "Born Host:", RemotingHelper.parseSocketAddressAddr(msg.getBornHost()));
System.out.printf(strFormat, "Store Host:", RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()));
System.out.printf(intFormat, "System Flag:", msg.getSysFlag());
System.out.printf(strFormat, "Properties:",
msg.getProperties() != null ? msg.getProperties().toString() : "");
System.out.printf(strFormat, "Message Body Path:", bodyTmpFilePath);
try {
List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
......@@ -149,18 +111,21 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
}
}
private static String createBodyFile(MessageExt msg) throws IOException {
private static String createBodyFile(MessageExt msg, int index) throws IOException {
DataOutputStream dos = null;
try {
String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
File file = new File(bodyTmpFilePath);
StringBuffer bodyTmpFilePath = new StringBuffer("/tmp/rocketmq/msgbodys");
File file = new File(bodyTmpFilePath.toString());
if (!file.exists()) {
file.mkdirs();
}
bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
bodyTmpFilePath.append("/").append(msg.getMsgId());
if (index > 0) {
bodyTmpFilePath.append("_" + index);
}
dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath.toString()));
dos.write(msg.getBody());
return bodyTmpFilePath;
return bodyTmpFilePath.toString();
} finally {
if (dos != null)
dos.close();
......@@ -195,6 +160,10 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
opt = new Option("a", "showAll", false, "Print all message, the limit is 32");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......@@ -202,11 +171,11 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
try {
defaultMQAdminExt = createMQAdminExt(rpcHook);
final String msgId = commandLine.getOptionValue('i').trim();
final String topic = commandLine.getOptionValue('t').trim();
final boolean showAll = commandLine.hasOption('a');
if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
final String consumerGroup = commandLine.getOptionValue('g').trim();
final String clientId = commandLine.getOptionValue('d').trim();
......@@ -214,7 +183,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
System.out.printf("%s", result);
} else {
queryById(defaultMQAdminExt, topic, msgId);
queryById(defaultMQAdminExt, topic, msgId, showAll);
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
......
......@@ -19,7 +19,6 @@ package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.codec.Charsets;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -38,7 +37,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class QueryMsgTraceByIdSubCommand implements SubCommand {
@Override
......@@ -46,6 +44,10 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
Option opt = new Option("i", "msgId", true, "Message Id");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("t", "traceTopic", true, "The name value of message trace topic");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......@@ -65,7 +67,11 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
final String msgId = commandLine.getOptionValue('i').trim();
this.queryTraceByMsgId(defaultMQAdminExt, msgId);
String traceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
if (commandLine.hasOption('t')) {
traceTopic = commandLine.getOptionValue('t').trim();
}
this.queryTraceByMsgId(defaultMQAdminExt, traceTopic, msgId);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "command failed", e);
} finally {
......@@ -73,14 +79,14 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
}
}
private void queryTraceByMsgId(final DefaultMQAdminExt admin, String msgId)
throws MQClientException, InterruptedException {
private void queryTraceByMsgId(final DefaultMQAdminExt admin, String traceTopic, String msgId)
throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(TopicValidator.RMQ_SYS_TRACE_TOPIC, msgId, 64, 0, System.currentTimeMillis());
QueryResult queryResult = admin.queryMessage(traceTopic, msgId, 64, 0, System.currentTimeMillis());
List<MessageExt> messageList = queryResult.getMessageList();
List<TraceView> traceViews = new ArrayList<>();
for (MessageExt message : messageList) {
List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, new String(message.getBody(), Charsets.UTF_8));
List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, message);
traceViews.addAll(traceView);
}
......@@ -92,20 +98,20 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
for (TraceView traceView : traceViews) {
if (traceView.getMsgType().equals(TraceType.Pub.name())) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"#Type",
"#ProducerGroup",
"#ClientHost",
"#SendTime",
"#CostTimes",
"#Status"
"#Type",
"#ProducerGroup",
"#ClientHost",
"#SendTime",
"#CostTimes",
"#Status"
);
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"Pub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
"Pub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
);
System.out.printf("\n");
}
......@@ -124,22 +130,22 @@ public class QueryMsgTraceByIdSubCommand implements SubCommand {
Iterator<String> consumers = consumerTraceMap.keySet().iterator();
while (consumers.hasNext()) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"#Type",
"#ConsumerGroup",
"#ClientHost",
"#ConsumerTime",
"#CostTimes",
"#Status"
"#Type",
"#ConsumerGroup",
"#ClientHost",
"#ConsumerTime",
"#CostTimes",
"#Status"
);
List<TraceView> consumerTraces = consumerTraceMap.get(consumers.next());
for (TraceView traceView : consumerTraces) {
System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
"Sub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
"Sub",
traceView.getGroupName(),
traceView.getClientHost(),
DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
traceView.getCostTime() + "ms",
traceView.getStatus()
);
}
System.out.printf("\n");
......
......@@ -19,14 +19,23 @@ package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopicRouteSubCommand implements SubCommand {
private static final String FORMAT = "%-45s %-32s %-50s %-10s %-11s %-5s%n";
@Override
public String commandName() {
return "topicRoute";
......@@ -43,6 +52,9 @@ public class TopicRouteSubCommand implements SubCommand {
opt.setRequired(true);
options.addOption(opt);
opt = new Option("l", "list", false, "Use list format to print data");
opt.setRequired(false);
options.addOption(opt);
return options;
}
......@@ -58,12 +70,46 @@ public class TopicRouteSubCommand implements SubCommand {
String topic = commandLine.getOptionValue('t').trim();
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
String json = topicRouteData.toJson(true);
System.out.printf("%s%n", json);
printData(topicRouteData, commandLine.hasOption('l'));
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
private void printData(TopicRouteData topicRouteData, boolean useListFormat) {
if (!useListFormat) {
System.out.printf("%s%n", topicRouteData.toJson(true));
return;
}
int totalReadQueue = 0, totalWriteQueue = 0;
List<QueueData> queueDataList = topicRouteData.getQueueDatas();
Map<String /*brokerName*/, QueueData> map = new HashMap<>();
for (QueueData queueData : queueDataList) {
map.put(queueData.getBrokerName(), queueData);
}
queueDataList.sort(Comparator.comparing(QueueData::getBrokerName));
List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
brokerDataList.sort(Comparator.comparing(BrokerData::getBrokerName));
System.out.printf(FORMAT, "#ClusterName", "#BrokerName", "#BrokerAddrs", "#ReadQueue", "#WriteQueue", "#Perm");
for (BrokerData brokerData : brokerDataList) {
String brokerName = brokerData.getBrokerName();
QueueData queueData = map.get(brokerName);
totalReadQueue += queueData.getReadQueueNums();
totalWriteQueue += queueData.getWriteQueueNums();
System.out.printf(FORMAT, brokerData.getCluster(), brokerName, brokerData.getBrokerAddrs(),
queueData.getReadQueueNums(), queueData.getWriteQueueNums(), queueData.getPerm());
}
for (int i = 0; i < 158; i++) {
System.out.print("-");
}
System.out.printf("%n");
System.out.printf(FORMAT, "Total:", map.keySet().size(), "", totalReadQueue, totalWriteQueue, "");
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册