未验证 提交 70da3e01 编写于 作者: R rongtong 提交者: GitHub

Merge branch 'develop' into master

......@@ -33,3 +33,4 @@ script:
after_success:
- mvn clean install -Pit-test
- mvn sonar:sonar -Psonar-apache
- bash <(curl -s https://codecov.io/bash) || echo 'Codecov failed to upload'
## Apache RocketMQ
[![Build Status](https://travis-ci.org/apache/rocketmq.svg?branch=master)](https://travis-ci.org/apache/rocketmq) [![Coverage Status](https://coveralls.io/repos/github/apache/rocketmq/badge.svg?branch=master)](https://coveralls.io/github/apache/rocketmq?branch=master)
[![CodeCov](https://codecov.io/gh/apache/rocketmq/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/rocketmq)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.rocketmq/rocketmq-all/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Corg.apache.rocketmq)
[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://rocketmq.apache.org/dowloading/releases)
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
......
......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -1240,6 +1240,7 @@ public class BrokerController {
}
}
public ExecutorService getSendMessageExecutor() {
return sendMessageExecutor;
}
}
......@@ -70,7 +70,8 @@ public class Broker2Client {
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}", group, messageExt.getMsgId(), e.getMessage());
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
group, messageExt.getMsgId(), e.toString());
}
}
......@@ -96,7 +97,7 @@ public class Broker2Client {
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());
}
}
......@@ -185,14 +186,14 @@ public class Broker2Client {
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}",
new Object[] {topic, group}, e);
log.error("[reset-offset] reset offset exception. topic={}, group={} ,error={}",
topic, group, e.toString());
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. version={}",
log.warn("[reset-offset] the client does not support this feature. channel={}, version={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
......@@ -253,7 +254,7 @@ public class Broker2Client {
result.setCode(ResponseCode.SYSTEM_ERROR);
result.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[get-consumer-status] the client does not support this feature. version={}",
log.warn("[get-consumer-status] the client does not support this feature. channel={}, version={}",
RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return result;
} else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
......@@ -279,8 +280,8 @@ public class Broker2Client {
}
} catch (Exception e) {
log.error(
"[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
new Object[] {topic, group}, e);
"[get-consumer-status] get consumer status exception. topic={}, group={}, error={}",
topic, group, e.toString());
}
if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
......
......@@ -47,7 +47,7 @@ public class ConsumerFilterManager extends ConfigManager {
private static final long MS_24_HOUR = 24 * 3600 * 1000;
private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256);
filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);
private transient BrokerController brokerController;
private transient BloomFilter bloomFilter;
......
......@@ -299,7 +299,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
......@@ -715,6 +717,9 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
......
......@@ -79,7 +79,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
asyncProcessRequest(ctx, request).thenAccept(responseCallback::callback);
asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
}
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
......@@ -281,6 +281,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
return this.defaultLitePullConsumerImpl.committed(messageQueue);
return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
}
@Override
......@@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
@Override
public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
this.defaultLitePullConsumerImpl.seekToBegin(messageQueue);
this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue));
}
@Override
public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
this.defaultLitePullConsumerImpl.seekToEnd(messageQueue);
this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue));
}
@Override
......
......@@ -20,8 +20,7 @@ import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
/**
* A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one
* thread
* A MessageListenerOrderly object is used to receive messages orderly. One queue by one thread
*/
public interface MessageListenerOrderly extends MessageListener {
/**
......
......@@ -643,72 +643,64 @@ public class MQClientAPIImpl {
final Message msg,
final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
SendStatus sendStatus;
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
case ResponseCode.FLUSH_DISK_TIMEOUT: {
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
break;
}
case ResponseCode.FLUSH_SLAVE_TIMEOUT: {
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
break;
}
case ResponseCode.SLAVE_NOT_AVAILABLE: {
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
break;
}
case ResponseCode.SUCCESS: {
SendStatus sendStatus = SendStatus.SEND_OK;
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
break;
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
break;
case ResponseCode.SLAVE_NOT_AVAILABLE:
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
break;
case ResponseCode.SUCCESS:
sendStatus = SendStatus.SEND_OK;
break;
default:
assert false;
break;
}
sendStatus = SendStatus.SEND_OK;
break;
}
default: {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
}
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
StringBuilder sb = new StringBuilder();
for (Message message : (MessageBatch) msg) {
sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
}
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
}
if (traceOn != null && traceOn.equals("false")) {
sendResult.setTraceOn(false);
} else {
sendResult.setTraceOn(true);
}
sendResult.setRegionId(regionId);
return sendResult;
}
default:
break;
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
StringBuilder sb = new StringBuilder();
for (Message message : (MessageBatch) msg) {
sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
}
uniqMsgId = sb.toString();
}
SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
}
if (traceOn != null && traceOn.equals("false")) {
sendResult.setTraceOn(false);
} else {
sendResult.setTraceOn(true);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
sendResult.setRegionId(regionId);
return sendResult;
}
public PullResult pullMessage(
......@@ -1366,7 +1358,7 @@ public class MQClientAPIImpl {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
......
......@@ -452,7 +452,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
......
......@@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2)
throw new MQClientException("Fetch consume offset from broker exception", null);
return offset;
......
......@@ -296,7 +296,7 @@ public class ProcessQueue {
}
}
public List<MessageExt> takeMessags(final int batchSize) {
public List<MessageExt> takeMessages(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.impl.factory;
import java.io.UnsupportedEncodingException;
import java.net.DatagramSocket;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
......@@ -118,7 +117,6 @@ public class MQClientInstance {
private final ConsumerStatsManager consumerStatsManager;
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
private ServiceState serviceState = ServiceState.CREATE_JUST;
private DatagramSocket datagramSocket;
private Random random = new Random();
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
......@@ -854,10 +852,6 @@ public class MQClientInstance {
this.mQClientAPIImpl.shutdown();
this.rebalanceService.shutdown();
if (this.datagramSocket != null) {
this.datagramSocket.close();
this.datagramSocket = null;
}
MQClientManager.getInstance().removeClientFactory(this.clientId);
log.info("the client factory [{}] shutdown OK", this.clientId);
break;
......
......@@ -258,7 +258,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
this.timer.cancel();
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
......@@ -1356,27 +1356,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
requestResponseFuture.setSendRequestOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.setSendRequestOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
......@@ -1395,7 +1386,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
requestResponseFuture.setSendRequestOk(true);
}
@Override
......@@ -1421,27 +1412,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
requestResponseFuture.setSendRequestOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.setSendRequestOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
......@@ -1461,7 +1443,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
requestResponseFuture.setSendRequestOk(true);
}
@Override
......@@ -1487,32 +1469,36 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
requestResponseFuture.setSendRequestOk(true);
}
@Override
public void onException(Throwable e) {
requestResponseFuture.setSendReqeustOk(false);
requestResponseFuture.setSendRequestOk(false);
requestResponseFuture.putResponseMessage(null);
requestResponseFuture.setCause(e);
}
}, null, timeout - cost);
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
return waitResponse(msg, timeout, requestResponseFuture, cost);
} finally {
RequestFutureTable.getRequestFutureTable().remove(correlationId);
}
}
private Message waitResponse(Message msg, long timeout, RequestResponseFuture requestResponseFuture, long cost) throws InterruptedException, RequestTimeoutException, MQClientException {
Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
if (responseMessage == null) {
if (requestResponseFuture.isSendRequestOk()) {
throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
"send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
} else {
throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
}
}
return responseMessage;
}
public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
long beginTimestamp = System.currentTimeMillis();
......@@ -1526,7 +1512,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendReqeustOk(true);
requestResponseFuture.setSendRequestOk(true);
}
@Override
......@@ -1540,7 +1526,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private void requestFail(final String correlationId) {
RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId);
if (responseFuture != null) {
responseFuture.setSendReqeustOk(false);
responseFuture.setSendRequestOk(false);
responseFuture.putResponseMessage(null);
try {
responseFuture.executeRequestCallback();
......
......@@ -103,8 +103,8 @@ public class RequestResponseFuture {
return sendRequestOk;
}
public void setSendReqeustOk(boolean sendReqeustOk) {
this.sendRequestOk = sendReqeustOk;
public void setSendRequestOk(boolean sendRequestOk) {
this.sendRequestOk = sendRequestOk;
}
public Message getRequestMsg() {
......
......@@ -38,7 +38,7 @@ public class ProcessQueueTest {
assertThat(pq.getMsgCount().get()).isEqualTo(100);
pq.takeMessags(10);
pq.takeMessages(10);
pq.commit();
assertThat(pq.getMsgCount().get()).isEqualTo(90);
......@@ -55,7 +55,7 @@ public class ProcessQueueTest {
assertThat(pq.getMsgSize().get()).isEqualTo(100 * 123);
pq.takeMessags(10);
pq.takeMessages(10);
pq.commit();
assertThat(pq.getMsgSize().get()).isEqualTo(90 * 123);
......@@ -74,17 +74,17 @@ public class ProcessQueueTest {
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(12);
pq.takeMessags(10000);
pq.takeMessages(10000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(10);
pq.takeMessags(10000);
pq.takeMessages(10000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(9);
pq.takeMessags(80000);
pq.takeMessages(80000);
pq.commit();
pq.fillProcessQueueInfo(processQueueInfo);
assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
......
......@@ -373,7 +373,7 @@ public class DefaultMQProducerTest {
assertThat(responseMap).isNotNull();
for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
RequestResponseFuture future = entry.getValue();
future.setSendReqeustOk(true);
future.setSendRequestOk(true);
message.setFlag(1);
future.getRequestCallback().onSuccess(message);
}
......
......@@ -37,7 +37,7 @@ public class RequestResponseFutureTest {
@Override public void onException(Throwable e) {
}
});
future.setSendReqeustOk(true);
future.setSendRequestOk(true);
future.executeRequestCallback();
assertThat(cc.get()).isEqualTo(1);
}
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -184,6 +184,8 @@ public class BrokerConfig {
private boolean storeReplyMessageEnable = true;
private boolean autoDeleteUnusedStats = false;
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
......@@ -793,4 +795,12 @@ public class BrokerConfig {
public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
this.storeReplyMessageEnable = storeReplyMessageEnable;
}
public boolean isAutoDeleteUnusedStats() {
return autoDeleteUnusedStats;
}
public void setAutoDeleteUnusedStats(boolean autoDeleteUnusedStats) {
this.autoDeleteUnusedStats = autoDeleteUnusedStats;
}
}
......@@ -414,6 +414,9 @@ public class MessageDecoder {
final String name = entry.getKey();
final String value = entry.getValue();
if (value == null) {
continue;
}
sb.append(name);
sb.append(NAME_VALUE_SEPARATOR);
sb.append(value);
......
......@@ -142,18 +142,20 @@ public class MessageExt extends Message {
}
public String getBornHostString() {
if (this.bornHost != null) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
return inetSocketAddress.getAddress().getHostAddress();
if (null != this.bornHost) {
InetAddress inetAddress = ((InetSocketAddress) this.bornHost).getAddress();
return null != inetAddress ? inetAddress.getHostAddress() : null;
}
return null;
}
public String getBornHostNameString() {
if (this.bornHost != null) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
return inetSocketAddress.getAddress().getHostName();
if (null != this.bornHost) {
InetAddress inetAddress = ((InetSocketAddress) this.bornHost).getAddress();
return null != inetAddress ? inetAddress.getHostName() : null;
}
return null;
......
......@@ -89,7 +89,7 @@ public class RequestCode {
public static final int REGISTER_BROKER = 103;
public static final int UNREGISTER_BROKER = 104;
public static final int GET_ROUTEINTO_BY_TOPIC = 105;
public static final int GET_ROUTEINFO_BY_TOPIC = 105;
public static final int GET_BROKER_CLUSTER_INFO = 106;
public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
......
......@@ -74,6 +74,26 @@ public class MomentStatsItemSet {
statsItem.getValue().set(value);
}
public void delValueByInfixKey(final String statsKey, String separator) {
Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MomentStatsItem> next = it.next();
if (next.getKey().contains(separator + statsKey + separator)) {
it.remove();
}
}
}
public void delValueBySuffixKey(final String statsKey, String separator) {
Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MomentStatsItem> next = it.next();
if (next.getKey().endsWith(separator + statsKey)) {
it.remove();
}
}
}
public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
......
......@@ -158,6 +158,43 @@ public class StatsItemSet {
statsItem.getTimes().addAndGet(incTimes);
}
public void delValue(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null != statsItem) {
this.statsItemTable.remove(statsKey);
}
}
public void delValueByPrefixKey(final String statsKey, String separator) {
Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, StatsItem> next = it.next();
if (next.getKey().startsWith(statsKey + separator)) {
it.remove();
}
}
}
public void delValueByInfixKey(final String statsKey, String separator) {
Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, StatsItem> next = it.next();
if (next.getKey().contains(separator + statsKey + separator)) {
it.remove();
}
}
}
public void delValueBySuffixKey(final String statsKey, String separator) {
Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, StatsItem> next = it.next();
if (next.getKey().endsWith(separator + statsKey)) {
it.remove();
}
}
}
public StatsItem getAndCreateStatsItem(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
......
......@@ -37,11 +37,13 @@ public class BrokerConfigTest {
brokerConfig.setBrokerId(0);
brokerConfig.setBrokerClusterName("DefaultCluster");
brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4");
brokerConfig.setAutoDeleteUnusedStats(true);
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4");
assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
assertThat(brokerConfig.isAutoDeleteUnusedStats()).isEqualTo(true);
}
}
\ No newline at end of file
......@@ -246,4 +246,23 @@ public class MessageDecoderTest {
assertThat("abc").isEqualTo(decodedMsg.getTopic());
}
}
public void testNullValueProperty() throws Exception {
MessageExt msg = new MessageExt();
msg.setBody("x".getBytes());
msg.setTopic("x");
msg.setBornHost(new InetSocketAddress("127.0.0.1", 9000));
msg.setStoreHost(new InetSocketAddress("127.0.0.1", 9000));
String key = "NullValueKey";
msg.putProperty(key, null);
try {
byte[] encode = MessageDecoder.encode(msg, false);
MessageExt decode = MessageDecoder.decode(ByteBuffer.wrap(encode));
assertThat(decode.getProperty(key)).isNull();
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
}
}
\ No newline at end of file
......@@ -42,4 +42,4 @@ fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/tools.sh org.apache.rocketmq.tools.command.MQAdminStartup $@
sh ${ROCKETMQ_HOME}/bin/tools.sh org.apache.rocketmq.tools.command.MQAdminStartup "$@"
......@@ -40,4 +40,4 @@ JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
$JAVA ${JAVA_OPT} $@
$JAVA ${JAVA_OPT} "$@"
......@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
......
......@@ -500,48 +500,52 @@ try {
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
```java
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加⽇日志的开销20字节
return tmpSize;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
......@@ -702,7 +706,7 @@ public class TransactionProducer {
```
#### 2、实现事务的监听接口
当发送半消息成功时,我们使用 `executeLocalTransaction` 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。`checkLocalTranscation` 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
当发送半消息成功时,我们使用 `executeLocalTransaction` 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。`checkLocalTransaction` 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
```java
public class TransactionListenerImpl implements TransactionListener {
......@@ -737,8 +741,8 @@ public class TransactionListenerImpl implements TransactionListener {
### 6.2 事务消息使用上的限制
1. 事务消息不支持延时消息和批量消息。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionMsgTimeout` 参数。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionalMessageCheckListener` 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionTimeout` 参数。
4. 事务性消息可能不止一次被检查或消费。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
......
......@@ -564,7 +564,7 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
<td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>从NameServer上清除 Broker写权限</td>
<td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>Broker 地址,地址为ip:port</td>
<td class=xl68 width=87 style='width:65pt'>BrokerName</td>
</tr>
<tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
......@@ -1161,7 +1161,7 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
</tr>
<tr height=39 style='height:29.0pt'>
<td rowspan=3 height=119 class=xl69 width=87 style='border-bottom:1.0pt
height:89.0pt;border-top:none;width:65pt'>consumerConnec tion</td>
height:89.0pt;border-top:none;width:65pt'>consumerConnection</td>
<td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>查询 Consumer 的网络连接</td>
<td class=xl67 width=87 style='width:65pt'>-g</td>
......@@ -1177,7 +1177,7 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker
</tr>
<tr height=39 style='height:29.0pt'>
<td rowspan=4 height=142 class=xl69 width=87 style='border-bottom:1.0pt
height:106.0pt;border-top:none;width:65pt'>producerConnec tion</td>
height:106.0pt;border-top:none;width:65pt'>producerConnection</td>
<td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>查询 Producer 的网络连接</td>
<td class=xl67 width=87 style='width:65pt'>-g</td>
......
# “Request-Reply”特性
---
## 1 使用场景
随着服务规模的扩大,单机服务无法满足性能和容量的要求,此时需要将服务拆分为更小粒度的服务或者部署多个服务实例构成集群来提供服务。在分布式场景下,RPC是最常用的联机调用的方式。
在构建分布式应用时,有些领域,例如金融服务领域,常常使用消息队列来构建服务总线,实现联机调用的目的。消息队列的主要场景是解耦、削峰填谷,在联机调用的场景下,需要将服务的调用抽象成基于消息的交互,并增强同步调用的这种交互逻辑。为了更好地支持消息队列在联机调用场景下的应用,rocketmq-4.7.0推出了“Request-Reply”特性来支持RPC调用。
## 2 设计思路
在rocketmq中,整个同步调用主要包括两个过程:
(1)请求方生成消息,发送给响应方,并等待响应方回包;
(2)响应方收到请求消息后,消费这条消息,并发出一条响应消息给请求方。
整个过程实质上是两个消息收发过程的组合。所以这里最关键的问题是如何将异步的消息收发过程构建成一个同步的过程。其中主要有两个问题需要解决:
### 2.1 请求方如何同步等待回包
这个问题的解决方案中,一个关键的数据结构是RequestResponseFuture。
```
public class RequestResponseFuture {
private final String correlationId;
private final RequestCallback requestCallback;
private final long beginTimestamp = System.currentTimeMillis();
private final Message requestMsg = null;
private long timeoutMillis;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile Message responseMsg = null;
private volatile boolean sendRequestOk = true;
private volatile Throwable cause = null;
}
```
RequestResponseFuture中,利用correlationId来标识一个请求。如下图所示,Producer发送request时创建一个RequestResponseFuture,以correlationId为key,RequestResponseFuture为value存入map,同时请求中带上RequestResponseFuture中的correlationId,收到回包后根据correlationId拿到对应的RequestResponseFuture,并设置回包内容。
![](image/producer_send_request.png)
### 2.2 consumer消费消息后,如何准确回包
(1)producer在发送消息的时候,会给每条消息生成唯一的标识符,同时还带上了producer的clientId。当consumer收到并消费消息后,从消息中取出消息的标识符correlationId和producer的标识符clientId,放入响应消息,用来确定此响应消息是哪条请求消息的回包,以及此响应消息应该发给哪个producer。同时响应消息中设置了消息的类型以及响应消息的topic,然后consumer将消息发给broker,如下图所示。
![](image/consumer_reply.png)
(2)broker收到响应消息后,需要将消息发回给指定的producer。Broker如何知道发回给哪个producer?因为消息中包含了producer的标识符clientId,在ProducerManager中,维护了标识符和channel信息的对应关系,通过这个对应关系,就能把回包发给对应的producer。
响应消息发送和一般的消息发送流程区别在于,响应消息不需要producer拉取,而是由broker直接推给producer。同时选择broker的策略也有变化:请求消息从哪个broker发过来,响应消息也发到对应的broker上。
Producer收到响应消息后,根据消息中的唯一标识符,从RequestResponseFuture的map中找到对应的RequestResponseFuture结构,设置响应消息,同时计数器减一,解除等待状态,使请求方收到响应消息。
## 3 使用方法
同步调用的示例在example文件夹的rpc目录下。
### 3.1 Producer
```
Message msg = new Message(topic,
"",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
long begin = System.currentTimeMillis();
Message retMsg = producer.request(msg, ttl);
long cost = System.currentTimeMillis() - begin;
System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
```
调用接口替换为request即可。
### 3.2 Consumer
需要启动一个producer,同时在覆写consumeMessage方法的时候,自定义响应消息并发送。
```
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {
try {
System.out.printf("handle message: %s", msg.toString());
String replyTo = MessageUtil.getReplyToClient(msg);
byte[] replyContent = "reply message contents.".getBytes();
// create reply message with given util, do not create reply message by yourself
Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);
// send reply message with producer
SendResult replyResult = replyProducer.send(replyMessage, 3000);
System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
```
## 4 接口参数
4.1 public Message request(Message msg,long timeout)
msg:待发送的消息
timeout:同步调用超时时间
4.2 public void request(Message msg, final RequestCallback requestCallback, long timeout)
msg:待发送的消息
requestCallback:回调函数
timeout:同步调用超时时间
4.3 public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout)
msg:待发送的消息
selector:消息队列选择器
arg:消息队列选择器需要的参数
timeout:同步调用超时时间
4.4 public void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback, final long timeout)
msg:待发送的消息
selector:消息队列选择器
arg:消息队列选择器需要的参数
requestCallback:回调函数
timeout:同步调用超时时间
4.5 public Message request(final Message msg, final MessageQueue mq, final long timeout)
msg:待发送的消息
mq:目标消息队列
timeout:同步调用超时时间
4.6 public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
msg:待发送的消息
mq:目标消息队列
requestCallback:回调函数
timeout:同步调用超时时间
......@@ -424,7 +424,7 @@ Before introducing the mqadmin management tool, the following points need to be
<td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>Clear write permissions for broker from nameServer</td>
<td class=xl67 width=87 style='width:65pt'>-b</td>
<td class=xl68 width=87 style='width:65pt'>Declare the address of the broker and format as ip:port</td>
<td class=xl68 width=87 style='width:65pt'>Declare the BrokerName</td>
</tr>
<tr height=57 style='height:43.0pt'>
<td height=57 class=xl67 width=87 style='height:43.0pt;width:65pt'>-n</td>
......@@ -1053,7 +1053,7 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=39 style='height:29.0pt'>
<td rowspan=3 height=119 class=xl69 width=87 style='border-bottom:1.0pt
height:89.0pt;border-top:none;width:65pt'>consumerConnec tion</td>
height:89.0pt;border-top:none;width:65pt'>consumerConnection</td>
<td rowspan=3 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>Query the network connection of consumer</td>
<td class=xl67 width=87 style='width:65pt'>-g</td>
......@@ -1069,7 +1069,7 @@ Before introducing the mqadmin management tool, the following points need to be
</tr>
<tr height=39 style='height:29.0pt'>
<td rowspan=4 height=142 class=xl69 width=87 style='border-bottom:1.0pt
height:106.0pt;border-top:none;width:65pt'>producerConnec tion</td>
height:106.0pt;border-top:none;width:65pt'>producerConnection</td>
<td rowspan=4 class=xl72 width=87 style='border-bottom:1.0pt
border-top:none;width:65pt'>Query the network connection of producer</td>
<td class=xl67 width=87 style='width:65pt'>-g</td>
......
......@@ -19,47 +19,52 @@ try {
### 2 Split into Lists
The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (4MiB). At this time, you’d better split the lists:
```java
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000;
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex - currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
break;
} else {
totalSize += tmpSize;
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
return tmpSize;
}
}
// then you could split the large list into small ones:
......
......@@ -88,8 +88,8 @@ public class TransactionListenerImpl implements TransactionListener {
## 3 Usage Constraint
1. Messages of the transactional have no schedule and batch support.
2. In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the ```transactionCheckMax``` parameter in the configuration of the broker, if one message has been checked over ```transactionCheckMax``` times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the ```AbstractTransactionCheckListener``` class.
3. A transactional message will be checked after a certain period of time that determined by parameter ```transactionTimeout``` in the configuration of the broker. And users also can change this limit by set user property “CHECK_IMMUNITY_TIME_IN_SECONDS” when sending transactional message, this parameter takes precedence over the “transactionMsgTimeout” parameter.
2. In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the ```transactionCheckMax``` parameter in the configuration of the broker, if one message has been checked over ```transactionCheckMax``` times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the ```AbstractTransactionalMessageCheckListener``` class.
3. A transactional message will be checked after a certain period of time that determined by parameter g```transactionTimeout``` in the configuration of the broker. And users also can change this limit by set user property ```CHECK_IMMUNITY_TIME_IN_SECONDS``` when sending transactional message, this parameter takes precedence over the ```transactionTimeout``` parameter.
4. A transactional message maybe checked or consumed more than once.
5. Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism.
6. Producer IDs of transactional messages cannot be shared with producer IDs of other types of messages. Unlike other types of message, transactional messages allow backward queries. MQ Server query clients by their Producer IDs.
......
Apache RocketMQ Developer Guide
--------
##### This guide helps develpers understand and use Apache RocketMQ quickly.
### 1. Concepts & Features
- [Concept](Concept.md):introduce basic concepts in RocketMQ.
- [Feature](Feature.md):introduce functional features of RocketMQ's implementations.
### 2. Architecture Design
- [Architecture](architecture.md):introduce RocketMQ's deployment and technical architecture.
- [Design](design.md):introduce design concept of RocketMQ's key mechanisms, including message storage, communication mechanisms, message filter, loadbalance, transaction message, etc.
### 3. Example
- [Example](RocketMQ_Example.md) :introduce RocketMQ's common usage, including basic example, sequence message example, delay message example, batch message example, filter message example, transaction message example, etc.
### 4. Best Practice
- [Best Practice](best_practice.md):introduce RocketMQ's best practice, including producer, consumer, broker, NameServer, configuration of client, and the best parameter configuration of JVM, linux.
- [Message Trace](msg_trace/user_guide.md):introduce how to use RocketMQ's message tracing feature.
- [Auth Management](acl/Operations_ACL.md):introduce how to deployment quickly and how to use RocketMQ cluster enabling auth management feature.
- [Quick Start](dledger/quick_start.md):introduce how to deploy Dledger quickly.
- [Cluster Deployment](dledger/deploy_guide.md):introduce how to deploy Dledger in cluster.
### 5. Operation and maintenance management
- [Operation](operation.md):introduce RocketMQ's deployment modes that including single-master mode, multi-master mode, multi-master multi-slave mode and so on, as well as the usage of operation tool mqadmin.
### 6. API Reference(TODO)
- [DefaultMQProducer API Reference](client/java/API_Reference_DefaultMQProducer.md)
### Examples List
- [basic example](Example_Simple.md)
- [sequence message example](Example_Orderly.md)
- [delay message example](Example_Delay.md)
- [batch message example](Example_Batch.md)
- [filter message example](Example_Filter.md)
- [transaction message example](Example_Transaction.md)
- [openmessaging example](Example_OpenMessaging.md)
# Dledger cluster deployment
---
## preface
This document introduces how to deploy auto failover RocketMQ-on-DLedger Group。
RocketMQ-on-DLedger Group is a broker group with **same name**, needs at least 3 nodes, elect a Leader by Raft algorithm automatically, the others as Follower, replicating data between Leader and Follower for system high available.
RocketMQ-on-DLedger Group can failover automatically, and maintains consistent.
RocketMQ-on-DLedger Group can scale up horizontal, that is, can deploy any RocketMQ-on-DLedger Groups providing services external.
## 1. New cluster deployment
#### 1.1 Write the configuration
each RocketMQ-on-DLedger Group needs at least 3 machines.(assuming 3 in this document)
write 3 configuration files, advising refer to the directory of conf/dledger 's example configuration file.
key configuration items:
| name | meaning | example |
| --- | --- | --- |
| enableDLegerCommitLog | whether enable DLedger  | true |
| dLegerGroup | DLedger Raft Group's name, advising maintain consistent to brokerName | RaftNode00 |
| dLegerPeers | DLedger Group's nodes port infos, each node's configuration stay consistent in the same group. | n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
| dLegerSelfId | node id, must belongs to dLegerPeers; each node is unique in the same group. | n0 |
| sendMessageThreadPoolNums | the count of sending thread, advising set equal to the cpu cores. | 16 |
the following presents an example configuration conf/dledger/broker-n0.conf.
```
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
```
### 1.2 Start Broker
Startup stays consistent with the old version.
`nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf & `
`nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf & `
`nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf & `
## 2. Upgrade old cluster
If old cluster deployed in Master mode, then each Master needs to be transformed into a RocketMQ-on-DLedger Group.
If old cluster deployed in Master-Slave mode, then each Master-Slave group needs to be transformed into a RocketMQ-on-DLedger Group.
### 2.1 Kill old Broker
execute kill command, or call `bin/mqshutdown broker`.
### 2.2 Check old Commitlog
Each node in RocketMQ-on-DLedger group is compatible with old Commitlog, but Raft replicating process works on the adding message only. So, to avoid occurring exceptions, old Commitlog must be consistent.
If old cluster deployed in Master-Slave mode, it maybe inconsistent after shutdown. Advising use md5sum to check at least 2 recently Commitlog file, if occur inconsistent, maintain consistent by copy.
Although RocketMQ-on-DLedger Group can deployed with 2 nodes, it lacks failover ability(at least 3 nodes can tolerate one node fail).
Make sure that both Master and Slave's Commitlog is consistent, then prepare 3 machines, copy old Commitlog from Master to this 3 machines(BTW, copy the config directory).
Then, go ahead to set configurations.
### 2.3 Modify configuration
Refer to New cluster deployment.
### 2.4 Restart Broker
Refer to New cluster deployment.
# Dledger Quick Deployment
---
### preface
This document is mainly introduced for how to build and deploy auto failover RocketMQ cluster based on DLedger.
For detailed new cluster deployment and old cluster upgrade document, please refer to [Deployment Guide](deploy_guide.md)
### 1. Build from source code
Build phase contains two parts, first, build DLedger, then build RocketMQ.
#### 1.1 Build DLedger
`git clone https://github.com/openmessaging/openmessaging-storage-dledger.git`
`cd openmessaging-storage-dledger`
`mvn clean install -DskipTests`
#### 1.2 Build RocketMQ
`git clone https://github.com/apache/rocketmq.git`
`cd rocketmq`
`git checkout -b store_with_dledger origin/store_with_dledger`
`mvn -Prelease-all -DskipTests clean install -U`
### 2. Quick Deployment
after build successful
`cd distribution/target/apache-rocketmq`
`sh bin/dledger/fast-try.sh start`
if the above commands executed successfully, then check cluster status by using mqadmin operation commands.
`sh bin/mqadmin clusterList -n 127.0.0.1:9876`
If everything goes well, the following content will appear:
![ClusterList](https://img.alicdn.com/5476e8b07b923/TB11Z.ZyCzqK1RjSZFLXXcn2XXa)
(BID is 0 indicate Master,the others are Follower)
After startup successful, producer can produce message, and then test failover scenario.
Stop cluster fastly, execute the following command:
`sh bin/dledger/fast-try.sh stop`
Quick deployment, default configuration is in directory conf/dledger,default storage path is /tmp/rmqstore.
### 3. Failover
After successful deployment, kill Leader process(as the above example, kill process that binds port 30931), about 10 seconds elapses, use clusterList command check cluster's status, Leader switch to another node.
# message trace
----
## 1. Message trace data's key properties
| Producer End| Consumer End| Broker End|
| --- | --- | --- |
| produce message | consume message | message's topic |
| send message time | delivery time, delivery rounds  | message store location |
| whether the message was sent successfully | whether message was consumed successfully | message's key |
| send cost-time | consume cost-time | message's tag value |
## 2. Enable message trace in cluster deployment
### 2.1 Broker's configuration file
following by Broker's properties file configuration that enable message trace:
```
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if msg tracing is open,the flag will be true
traceTopicEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876
```
### 2.2 Common mode
Each Broker node in RocketMQ cluster used for storing message trace data that client collected and sent. So, there is no requirements and limitations to the size of Broker node in RocketMQ cluster.
### 2.3 IO physical isolation mode
For huge amounts of message trace data scenario, we can select any one Broker node in RocketMQ cluster used for storing message trace data special, thus, common message data's IO are isolated from message trace data's IO in physical, not impact each other. In this mode, RocketMQ cluster must have at least two Broker nodes, the one that defined as storing message trace data.
### 2.4 Start Broker that enable message trace
`nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &`
## 3. Save the definition of topic that with support message trace
RocketMQ's message trace feature supports two types of storage.
### 3.1 System level TraceTopic
Be default, message trace data is stored in system level TraceTopic(topic name: **RMQ_SYS_TRACE_TOPIC**). That topic will be created at startup of broker(As mentioned above, set **traceTopicEnable** to **true** in Broker's configuration).
### 3.2 User defined TraceTopic
If user don't want to store message trace data in system level TraceTopic, he can create user defined TraceTopic used for storing message trace data(that is, create common topic for storing message trace data). The following part will introduce how client SDK support user defined TraceTopic.
## 4. Client SDK demo with message trace feature
For business system adapting to use RocketMQ's message trace feature easily, in design phase, the author add a switch parameter(**enableMsgTrace**) for enable message trace; add a custom parameter(**customizedTraceTopic**) for user defined TraceTopic.
### 4.1 Enable message trace when sending messages
```
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.setNamesrvAddr("XX.XX.XX.XX1");
producer.start();
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
```
### 4.2 Enable message trace when subscribe messages
```
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
```
### 4.3 Self-defined topic support message trace
Adjusting instantiation of DefaultMQProducer and DefaultMQPushConsumer as following code to support user defined TraceTopic.
```
##Topic_test11111 should be created by user, used for storing message trace data.
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
......
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
......
```
此差异已折叠。
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -51,12 +51,12 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.filter.expression;
/**
* BooleanConstantExpression
*/
public class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
public BooleanConstantExpression(Object value) {
super(value);
}
public boolean matches(EvaluationContext context) throws Exception {
Object object = evaluate(context);
return object != null && object == Boolean.TRUE;
}
}
......@@ -90,11 +90,11 @@ public abstract class ComparisonExpression extends BinaryExpression implements B
}
public static BooleanExpression createIsNull(Expression left) {
return doCreateEqual(left, ConstantExpression.NULL);
return doCreateEqual(left, BooleanConstantExpression.NULL);
}
public static BooleanExpression createIsNotNull(Expression left) {
return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
return UnaryExpression.createNOT(doCreateEqual(left, BooleanConstantExpression.NULL));
}
public static BooleanExpression createNotEqual(Expression left, Expression right) {
......
......@@ -30,21 +30,6 @@ package org.apache.rocketmq.filter.expression;
*/
public class ConstantExpression implements Expression {
static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
public BooleanConstantExpression(Object value) {
super(value);
}
public boolean matches(EvaluationContext context) throws Exception {
Object object = evaluate(context);
return object != null && object == Boolean.TRUE;
}
}
public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
private Object value;
public ConstantExpression(Object value) {
......@@ -60,16 +45,10 @@ public class ConstantExpression implements Expression {
// only support Long.MIN_VALUE ~ Long.MAX_VALUE
Number value = new Long(text);
// try {
// value = new Long(text);
// } catch (NumberFormatException e) {
// // The number may be too big to fit in a long.
// value = new BigDecimal(text);
// }
long l = value.longValue();
if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
value = Integer.valueOf(value.intValue());
value = value.intValue();
}
return new ConstantExpression(value);
}
......@@ -106,7 +85,7 @@ public class ConstantExpression implements Expression {
return "NULL";
}
if (value instanceof Boolean) {
return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
return (Boolean) value ? "TRUE" : "FALSE";
}
if (value instanceof String) {
return encodeString((String) value);
......@@ -138,17 +117,19 @@ public class ConstantExpression implements Expression {
* it was provided in a selector.
*/
public static String encodeString(String s) {
StringBuffer b = new StringBuffer();
b.append('\'');
StringBuilder builder = new StringBuilder();
builder.append('\'');
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
if (c == '\'') {
b.append(c);
builder.append(c);
}
b.append(c);
builder.append(c);
}
b.append('\'');
return b.toString();
builder.append('\'');
return builder.toString();
}
}
......@@ -20,6 +20,7 @@ package org.apache.rocketmq.filter.parser;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.rocketmq.filter.expression.BooleanConstantExpression;
import org.apache.rocketmq.filter.expression.BooleanExpression;
import org.apache.rocketmq.filter.expression.ComparisonExpression;
import org.apache.rocketmq.filter.expression.ConstantExpression;
......@@ -437,15 +438,15 @@ public class SelectorParser implements SelectorParserConstants {
break;
case TRUE:
jj_consume_token(TRUE);
left = ConstantExpression.TRUE;
left = BooleanConstantExpression.TRUE;
break;
case FALSE:
jj_consume_token(FALSE);
left = ConstantExpression.FALSE;
left = BooleanConstantExpression.FALSE;
break;
case NULL:
jj_consume_token(NULL);
left = ConstantExpression.NULL;
left = BooleanConstantExpression.NULL;
break;
default:
jjLa1[12] = jjGen;
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -97,7 +97,7 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
......@@ -384,10 +384,12 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
log.info("wipe write perm of broker[{}], client: {}, {}",
requestHeader.getBrokerName(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
wipeTopicCnt);
if (ctx != null) {
log.info("wipe write perm of broker[{}], client: {}, {}",
requestHeader.getBrokerName(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
wipeTopicCnt);
}
responseHeader.setWipeTopicCount(wipeTopicCnt);
response.setCode(ResponseCode.SUCCESS);
......@@ -502,7 +504,9 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
}
private RemotingCommand updateConfig(ChannelHandlerContext ctx, RemotingCommand request) {
log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (ctx != null) {
log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
......@@ -518,13 +522,6 @@ public class DefaultRequestProcessor extends AsyncNettyRequestProcessor implemen
return response;
}
if (bodyStr == null) {
log.error("updateConfig get null body!");
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("string2Properties error");
return response;
}
Properties properties = MixAll.string2Properties(bodyStr);
if (properties == null) {
log.error("updateConfig MixAll.string2Properties error {}", bodyStr);
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -29,7 +29,7 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
......@@ -42,7 +42,7 @@
<url>git@github.com:apache/rocketmq.git</url>
<connection>scm:git:git@github.com:apache/rocketmq.git</connection>
<developerConnection>scm:git:git@github.com:apache/rocketmq.git</developerConnection>
<tag>rocketmq-all-4.7.0</tag>
<tag>HEAD</tag>
</scm>
<mailingLists>
......@@ -280,7 +280,7 @@
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.7.8</version>
<version>0.8.5</version>
<executions>
<execution>
<id>default-prepare-agent</id>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -354,6 +354,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
Collections.shuffle(addrs);
log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
this.namesrvAddrList.set(addrs);
if (!addrs.contains(this.namesrvAddrChoosed.get())) {
this.namesrvAddrChoosed.set(null);
}
}
}
}
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -1383,11 +1383,8 @@ public class CommitLog {
return nextOffset;
}
public void wakeupCustomer(final boolean flushOK) {
long endTimestamp = System.currentTimeMillis();
PutMessageStatus result = (flushOK && ((endTimestamp - this.startTimestamp) <= this.timeoutMillis)) ?
PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT;
this.flushOKFuture.complete(result);
public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
this.flushOKFuture.complete(putMessageStatus);
}
public CompletableFuture<PutMessageStatus> future() {
......@@ -1407,9 +1404,7 @@ public class CommitLog {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
this.wakeup();
}
private void swapRequests() {
......@@ -1433,7 +1428,7 @@ public class CommitLog {
}
}
req.wakeupCustomer(flushOK);
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
......
......@@ -114,6 +114,9 @@ public class DefaultMessageStore implements MessageStore {
boolean shutDownNormal = false;
private final ScheduledExecutorService diskCheckScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread"));
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
......@@ -293,7 +296,7 @@ public class DefaultMessageStore implements MessageStore {
this.shutdown = true;
this.scheduledExecutorService.shutdown();
this.diskCheckScheduledExecutorService.shutdown();
try {
Thread.sleep(1000);
......@@ -1029,6 +1032,10 @@ public class DefaultMessageStore implements MessageStore {
}
it.remove();
if (this.brokerConfig.isAutoDeleteUnusedStats()) {
this.brokerStatsManager.onTopicDeleted(topic);
}
log.info("cleanUnusedTopic: {},topic destroyed", topic);
}
}
......@@ -1325,6 +1332,11 @@ public class DefaultMessageStore implements MessageStore {
// DefaultMessageStore.this.cleanExpiredConsumerQueue();
// }
// }, 1, 1, TimeUnit.HOURS);
this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
}
}, 1000L, 10000L, TimeUnit.MILLISECONDS);
}
private void cleanFilesPeriodically() {
......@@ -1723,6 +1735,30 @@ public class DefaultMessageStore implements MessageStore {
public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
}
public boolean isSpaceFull() {
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
if (physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio);
}
if (physicRatio > this.diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full");
}
return true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok");
}
return false;
}
}
}
class CleanConsumeQueueService {
......
......@@ -39,6 +39,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageStatus;
public class HAService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
......@@ -260,9 +261,7 @@ public class HAService {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
this.wakeup();
}
public void notifyTransferSome() {
......@@ -291,7 +290,7 @@ public class HAService {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK);
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
......
......@@ -21,6 +21,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.HashMap;
import java.util.Map;
public class WaitNotifyObject {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
......@@ -65,9 +66,9 @@ public class WaitNotifyObject {
synchronized (this) {
boolean needNotify = false;
for (Boolean value : this.waitingThreadTable.values()) {
needNotify = needNotify || !value;
value = true;
for (Map.Entry<Long,Boolean> entry : this.waitingThreadTable.entrySet()) {
needNotify = needNotify || !entry.getValue();
entry.setValue(true);
}
if (needNotify) {
......
......@@ -134,7 +134,9 @@ public class IndexFile {
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
......
......@@ -121,6 +121,26 @@ public class BrokerStatsManager {
return null;
}
public void onTopicDeleted(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).delValue(topic);
this.statsTable.get(TOPIC_PUT_SIZE).delValue(topic);
this.statsTable.get(GROUP_GET_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueByPrefixKey(topic, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueByPrefixKey(topic, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
}
public void onGroupDeleted(final String group) {
this.statsTable.get(GROUP_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_SIZE).delValueBySuffixKey(group, "@");
this.statsTable.get(SNDBCK_PUT_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(GROUP_GET_LATENCY).delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallSize.delValueBySuffixKey(group, "@");
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
public void incTopicPutNums(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
}
......
......@@ -19,8 +19,12 @@ package org.apache.rocketmq.store;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.index.IndexFile;
import org.apache.rocketmq.store.index.IndexService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Before;
......@@ -31,7 +35,9 @@ import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
......@@ -52,6 +58,7 @@ public class DefaultMessageStoreCleanFilesTest {
private SocketAddress storeHost;
private String topic = "test";
private String keys = "hello";
private int queueId = 0;
private int fileCountCommitLog = 55;
// exactly one message per CommitLog file.
......@@ -65,6 +72,43 @@ public class DefaultMessageStoreCleanFilesTest {
bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
}
@Test
public void testIsSpaceFullFunctionEmpty2Full() throws Exception {
String deleteWhen = "04";
// the min value of diskMaxUsedSpaceRatio.
int diskMaxUsedSpaceRatio = 1;
// used to set disk-full flag
double diskSpaceCleanForciblyRatio = 0.01D;
initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio);
// build and put 55 messages, exactly one message per CommitLog file.
buildAndPutMessagesToMessageStore(msgCount);
MappedFileQueue commitLogQueue = getMappedFileQueueCommitLog();
assertEquals(fileCountCommitLog, commitLogQueue.getMappedFiles().size());
int fileCountConsumeQueue = getFileCountConsumeQueue();
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
cleanCommitLogService.isSpaceFull();
assertEquals(1 << 4, messageStore.getRunningFlags().getFlagBits() & (1 << 4));
messageStore.shutdown();
messageStore.destroy();
}
@Test
public void testIsSpaceFullFunctionFull2Empty() throws Exception {
String deleteWhen = "04";
// the min value of diskMaxUsedSpaceRatio.
int diskMaxUsedSpaceRatio = 1;
//use to reset disk-full flag
double diskSpaceCleanForciblyRatio = 0.999D;
initMessageStore(deleteWhen, diskMaxUsedSpaceRatio, diskSpaceCleanForciblyRatio);
//set disk full
messageStore.getRunningFlags().getAndMakeDiskFull();
cleanCommitLogService.isSpaceFull();
assertEquals(0, messageStore.getRunningFlags().getFlagBits() & (1 << 4));
}
@Test
public void testDeleteExpiredFilesByTimeUp() throws Exception {
String deleteWhen = Calendar.getInstance().get(Calendar.HOUR_OF_DAY) + "";
......@@ -87,6 +131,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
......@@ -101,6 +148,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) expectDeletedCount / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -126,6 +177,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
......@@ -140,6 +194,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) expectDeletedCount / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -165,6 +223,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
// In this case, there is no need to expire the files.
// int expireFileCount = 15;
// expireFiles(commitLogQueue, expireFileCount);
......@@ -181,6 +242,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) (a * 10) / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 10) / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -208,6 +273,9 @@ public class DefaultMessageStoreCleanFilesTest {
MappedFileQueue consumeQueue = getMappedFileQueueConsumeQueue();
assertEquals(fileCountConsumeQueue, consumeQueue.getMappedFiles().size());
int fileCountIndexFile = getFileCountIndexFile();
assertEquals(fileCountIndexFile, getIndexFileList().size());
int expireFileCount = 15;
expireFiles(commitLogQueue, expireFileCount);
......@@ -222,6 +290,10 @@ public class DefaultMessageStoreCleanFilesTest {
int msgCountPerFile = getMsgCountPerConsumeQueueMappedFile();
int expectDeleteCountConsumeQueue = (int) Math.floor((double) expectDeletedCount / msgCountPerFile);
assertEquals(fileCountConsumeQueue - expectDeleteCountConsumeQueue, consumeQueue.getMappedFiles().size());
int msgCountPerIndexFile = getMsgCountPerIndexFile();
int expectDeleteCountIndexFile = (int) Math.floor((double) (a * 10) / msgCountPerIndexFile);
assertEquals(fileCountIndexFile - expectDeleteCountIndexFile, getIndexFileList().size());
}
}
......@@ -274,32 +346,60 @@ public class DefaultMessageStoreCleanFilesTest {
return fileQueue;
}
private ArrayList<IndexFile> getIndexFileList() throws Exception {
Field indexServiceField = messageStore.getClass().getDeclaredField("indexService");
indexServiceField.setAccessible(true);
IndexService indexService = (IndexService) indexServiceField.get(messageStore);
Field indexFileListField = indexService.getClass().getDeclaredField("indexFileList");
indexFileListField.setAccessible(true);
ArrayList<IndexFile> indexFileList = (ArrayList<IndexFile>) indexFileListField.get(indexService);
return indexFileList;
}
private int getFileCountConsumeQueue() {
int countPerFile = getMsgCountPerConsumeQueueMappedFile();
double fileCount = (double) msgCount / countPerFile;
return (int) Math.ceil(fileCount);
}
private int getFileCountIndexFile() {
int countPerFile = getMsgCountPerIndexFile();
double fileCount = (double) msgCount / countPerFile;
return (int) Math.ceil(fileCount);
}
private int getMsgCountPerConsumeQueueMappedFile() {
int size = messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueue();
return size / CQ_STORE_UNIT_SIZE;// 7 in this case
}
private int getMsgCountPerIndexFile() {
// 7 in this case
return messageStore.getMessageStoreConfig().getMaxIndexNum() - 1;
}
private void buildAndPutMessagesToMessageStore(int msgCount) throws Exception {
int msgLen = topic.getBytes(CHARSET_UTF8).length + 91;
Map<String, String> properties = new HashMap<>(4);
properties.put(MessageConst.PROPERTY_KEYS, keys);
String s = MessageDecoder.messageProperties2String(properties);
int propertiesLen = s.getBytes(CHARSET_UTF8).length;
int commitLogEndFileMinBlankLength = 4 + 4;
int singleMsgBodyLen = mappedFileSize - msgLen - commitLogEndFileMinBlankLength;
int singleMsgBodyLen = mappedFileSize - msgLen - propertiesLen - commitLogEndFileMinBlankLength;
for (int i = 0; i < msgCount; i++) {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
msg.setBody(new byte[singleMsgBodyLen]);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setKeys(keys);
msg.setQueueId(queueId);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(storeHost);
msg.setBornHost(bornHost);
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
PutMessageResult result = messageStore.putMessage(msg);
assertTrue(result != null && result.isOk());
}
......@@ -324,8 +424,8 @@ public class DefaultMessageStoreCleanFilesTest {
MessageStoreConfig messageStoreConfig = new MessageStoreConfigForTest();
messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
messageStoreConfig.setMappedFileSizeConsumeQueue(mappedFileSize);
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(8);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package stats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_FALL_TIME;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_LATENCY;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.GROUP_GET_SIZE;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.SNDBCK_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
public class BrokerStatsManagerTest {
private BrokerStatsManager brokerStatsManager;
private String TOPIC = "TOPIC_TEST";
private String GROUP_NAME = "GROUP_TEST";
@Before
public void init() {
brokerStatsManager = new BrokerStatsManager("DefaultCluster");
brokerStatsManager.start();
}
@After
public void destory() {
brokerStatsManager.shutdown();
}
@Test
public void testGetStatsItem() {
assertThat(brokerStatsManager.getStatsItem("TEST", "TEST")).isNull();
}
@Test
public void testIncTopicPutNums() {
brokerStatsManager.incTopicPutNums(TOPIC);
assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getTimes().doubleValue()).isEqualTo(1L);
brokerStatsManager.incTopicPutNums(TOPIC, 2, 2);
assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC).getValue().doubleValue()).isEqualTo(3L);
}
@Test
public void testIncTopicPutSize() {
brokerStatsManager.incTopicPutSize(TOPIC, 2);
assertThat(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC).getValue().doubleValue()).isEqualTo(2L);
}
@Test
public void testIncGroupGetNums() {
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncGroupGetSize() {
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 1);
String statsKey = brokerStatsManager.buildStatsKey(TOPIC, GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncGroupGetLatency() {
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
String statsKey = String.format("%d@%s@%s", 1, TOPIC, GROUP_NAME);
assertThat(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, statsKey).getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testIncBrokerPutNums() {
brokerStatsManager.incBrokerPutNums();
assertThat(brokerStatsManager.getStatsItem(BROKER_PUT_NUMS, "DefaultCluster").getValue().doubleValue()).isEqualTo(1L);
}
@Test
public void testOnTopicDeleted() {
brokerStatsManager.incTopicPutNums(TOPIC);
brokerStatsManager.incTopicPutSize(TOPIC, 100);
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.onTopicDeleted(TOPIC);
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_NUMS, TOPIC));
Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_SIZE, TOPIC));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
}
@Test
public void testOnGroupDeleted(){
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.onGroupDeleted(GROUP_NAME);
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_SIZE, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(SNDBCK_PUT_NUMS, TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE, "1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME, "1@" + TOPIC + "@" + GROUP_NAME));
}
}
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -64,8 +64,9 @@ public class RMQOrderListener extends AbstractListener implements MessageListene
return String.format("%s_%s", queueId, brokerIp);
}
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
if (isDebug) {
if (listenerName != null && listenerName != "") {
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.7.0</version>
<version>4.7.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册