未验证 提交 8eb275f5 编写于 作者: S sunhangda 提交者: GitHub

Merge branch 'apache:develop' into develop

...@@ -251,12 +251,6 @@ public class PlainPermissionManager { ...@@ -251,12 +251,6 @@ public class PlainPermissionManager {
} }
public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) { public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
if (globalWhiteAddrsList == null) {
log.error("Parameter value globalWhiteAddrsList is null,Please check your parameter");
return false;
}
Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class); Map.class);
if (aclAccessConfigMap == null || aclAccessConfigMap.isEmpty()) { if (aclAccessConfigMap == null || aclAccessConfigMap.isEmpty()) {
...@@ -266,9 +260,10 @@ public class PlainPermissionManager { ...@@ -266,9 +260,10 @@ public class PlainPermissionManager {
if (globalWhiteRemoteAddrList != null) { if (globalWhiteRemoteAddrList != null) {
globalWhiteRemoteAddrList.clear(); globalWhiteRemoteAddrList.clear();
if (globalWhiteAddrsList != null) {
globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList); globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList);
}
// Update globalWhiteRemoteAddr element in memeory map firstly // Update globalWhiteRemoteAddr element in memory map firstly
aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS, globalWhiteRemoteAddrList); aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS, globalWhiteRemoteAddrList);
if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) { if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
return true; return true;
......
...@@ -53,6 +53,11 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil ...@@ -53,6 +53,11 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
return transferred; return transferred;
} }
@Override
public long transferred() {
return transferred;
}
@Override @Override
public long count() { public long count() {
return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize(); return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
...@@ -76,6 +81,28 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil ...@@ -76,6 +81,28 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
return 0; return 0;
} }
@Override
public FileRegion retain() {
super.retain();
return this;
}
@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FileRegion touch() {
return this;
}
@Override
public FileRegion touch(Object hint) {
return this;
}
public void close() { public void close() {
this.deallocate(); this.deallocate();
} }
......
...@@ -47,6 +47,11 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File ...@@ -47,6 +47,11 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
return transferred; return transferred;
} }
@Override
public long transferred() {
return transferred;
}
@Override @Override
public long count() { public long count() {
return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize(); return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize();
...@@ -65,6 +70,28 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File ...@@ -65,6 +70,28 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
return 0; return 0;
} }
@Override
public FileRegion retain() {
super.retain();
return this;
}
@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FileRegion touch() {
return this;
}
@Override
public FileRegion touch(Object hint) {
return this;
}
public void close() { public void close() {
this.deallocate(); this.deallocate();
} }
......
...@@ -53,6 +53,11 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi ...@@ -53,6 +53,11 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
return transferred; return transferred;
} }
@Override
public long transferred() {
return transferred;
}
@Override @Override
public long count() { public long count() {
return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize(); return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize();
...@@ -76,6 +81,28 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi ...@@ -76,6 +81,28 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
return 0; return 0;
} }
@Override
public FileRegion retain() {
super.retain();
return this;
}
@Override
public FileRegion retain(int increment) {
super.retain(increment);
return this;
}
@Override
public FileRegion touch() {
return this;
}
@Override
public FileRegion touch(Object hint) {
return this;
}
public void close() { public void close() {
this.deallocate(); this.deallocate();
} }
......
...@@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore { ...@@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
} }
@Override @Override
public boolean appendToCommitLog(long startOffset, byte[] data) { public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
return next.appendToCommitLog(startOffset, data); return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
} }
@Override @Override
......
...@@ -178,7 +178,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -178,7 +178,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); Integer times = requestHeader.getMaxReconsumeTimes();
if (times != null) {
maxReconsumeTimes = times;
}
} }
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
......
...@@ -178,7 +178,7 @@ public class ClientConfig { ...@@ -178,7 +178,7 @@ public class ClientConfig {
} }
public String getNamesrvAddr() { public String getNamesrvAddr() {
if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr.trim())) { if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.NAMESRV_ENDPOINT_PATTERN.matcher(namesrvAddr.trim()).matches()) {
return NameServerAddressUtils.getNameSrvAddrFromNamesrvEndpoint(namesrvAddr); return NameServerAddressUtils.getNameSrvAddrFromNamesrvEndpoint(namesrvAddr);
} }
return namesrvAddr; return namesrvAddr;
......
...@@ -236,11 +236,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -236,11 +236,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private boolean unitMode = false; private boolean unitMode = false;
/** /**
* Max re-consume times. -1 means 16 times. * Max re-consume times.
* </p> * In concurrently mode, -1 means 16;
* In orderly mode, -1 means Integer.MAX_VALUE.
* *
* If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion * If messages are re-consumed more than {@link #maxReconsumeTimes} before success.
* queue waiting.
*/ */
private int maxReconsumeTimes = -1; private int maxReconsumeTimes = -1;
......
...@@ -110,32 +110,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService ...@@ -110,32 +110,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override @Override
public void incCorePoolSize() { public void incCorePoolSize() {
// long corePoolSize = this.consumeExecutor.getCorePoolSize();
// if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
// {
// this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
// + 1);
// }
// log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
// {}",
// corePoolSize,
// this.consumeExecutor.getCorePoolSize(),
// this.consumerGroup);
} }
@Override @Override
public void decCorePoolSize() { public void decCorePoolSize() {
// long corePoolSize = this.consumeExecutor.getCorePoolSize();
// if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
// {
// this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
// - 1);
// }
// log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
// {}",
// corePoolSize,
// this.consumeExecutor.getCorePoolSize(),
// this.consumerGroup);
} }
@Override @Override
...@@ -411,11 +391,11 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService ...@@ -411,11 +391,11 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
} }
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e), RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup, ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs, msgs,
messageQueue); messageQueue), e);
hasException = true; hasException = true;
} }
long consumeRT = System.currentTimeMillis() - beginTimestamp; long consumeRT = System.currentTimeMillis() - beginTimestamp;
......
...@@ -487,11 +487,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { ...@@ -487,11 +487,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) { } catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e), RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup, ConsumeMessageOrderlyService.this.consumerGroup,
msgs, msgs,
messageQueue); messageQueue), e);
hasException = true; hasException = true;
} finally { } finally {
this.processQueue.getConsumeLock().unlock(); this.processQueue.getConsumeLock().unlock();
......
...@@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first."; private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive."; private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
/** /**
* the type of subscription * the type of subscription
*/ */
...@@ -195,19 +195,21 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -195,19 +195,21 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
private void checkServiceState() { private void checkServiceState() {
if (this.serviceState != ServiceState.RUNNING) if (this.serviceState != ServiceState.RUNNING) {
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
} }
}
public void updateNameServerAddr(String newAddresses) { public void updateNameServerAddr(String newAddresses) {
this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses); this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
} }
private synchronized void setSubscriptionType(SubscriptionType type) { private synchronized void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE) if (this.subscriptionType == SubscriptionType.NONE) {
this.subscriptionType = type; this.subscriptionType = type;
else if (this.subscriptionType != type) } else if (this.subscriptionType != type) {
throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE); throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
}
} }
private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) { private void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue) {
...@@ -464,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -464,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, String subExpression) throws MQClientException { public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) { if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty."); throw new IllegalArgumentException("Topic can not be null or empty.");
} }
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
...@@ -483,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -483,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try { try {
if (topic == null || topic.equals("")) { if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty."); throw new IllegalArgumentException("Topic can not be null or empty.");
} }
setSubscriptionType(SubscriptionType.SUBSCRIBE); setSubscriptionType(SubscriptionType.SUBSCRIBE);
...@@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized List<MessageExt> poll(long timeout) { public synchronized List<MessageExt> poll(long timeout) {
try { try {
checkServiceState(); checkServiceState();
if (timeout < 0) if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative"); throw new IllegalArgumentException("Timeout must not be negative");
}
if (defaultLitePullConsumer.isAutoCommit()) { if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit(); maybeAutoCommit();
...@@ -546,10 +549,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -546,10 +549,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (endTime - System.currentTimeMillis() > 0) { if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (endTime - System.currentTimeMillis() <= 0) if (endTime - System.currentTimeMillis() <= 0) {
break; break;
} }
} }
}
if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
List<MessageExt> messages = consumeRequest.getMessageExts(); List<MessageExt> messages = consumeRequest.getMessageExts();
...@@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException { public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState(); checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2) if (offset == -2) {
throw new MQClientException("Fetch consume offset from broker exception", null); throw new MQClientException("Fetch consume offset from broker exception", null);
}
return offset; return offset;
} }
...@@ -683,10 +688,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -683,10 +688,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
} }
Iterator<ConsumeRequest> iter = consumeRequestCache.iterator(); Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
if (iter.next().getMessageQueue().equals(messageQueue)) if (iter.next().getMessageQueue().equals(messageQueue)) {
iter.remove(); iter.remove();
} }
} }
}
private long nextPullOffset(MessageQueue messageQueue) throws MQClientException { private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
long offset = -1; long offset = -1;
...@@ -735,10 +741,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -735,10 +741,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return; return;
} }
if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) { if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes); log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
}
return; return;
} }
...@@ -778,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -778,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long offset = 0L; long offset = 0L;
try { try {
offset = nextPullOffset(messageQueue); offset = nextPullOffset(messageQueue);
} catch (MQClientException e) { } catch (Exception e) {
log.error("Failed to get next pull offset", e); log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return; return;
...@@ -790,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { ...@@ -790,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long pullDelayTimeMills = 0; long pullDelayTimeMills = 0;
try { try {
SubscriptionData subscriptionData; SubscriptionData subscriptionData;
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
String topic = this.messageQueue.getTopic(); String topic = this.messageQueue.getTopic();
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else { } else {
String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL); subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
} }
......
...@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { ...@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long offset = -1L; long offset = -1L;
try { try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue()); offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
} catch (MQClientException e) { } catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e); log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return; return;
......
...@@ -378,7 +378,7 @@ public abstract class RebalanceImpl { ...@@ -378,7 +378,7 @@ public abstract class RebalanceImpl {
long nextOffset = -1L; long nextOffset = -1L;
try { try {
nextOffset = this.computePullFromWhereWithException(mq); nextOffset = this.computePullFromWhereWithException(mq);
} catch (MQClientException e) { } catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue; continue;
} }
......
...@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl { ...@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try { try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) { } catch (MQClientException e) {
result = -1; log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
throw e;
} }
} }
} else { } else {
......
...@@ -60,7 +60,6 @@ public class TraceDataEncoderTest { ...@@ -60,7 +60,6 @@ public class TraceDataEncoderTest {
Assert.assertEquals(contexts.get(0).getTraceType(), TraceType.Pub); Assert.assertEquals(contexts.get(0).getTraceType(), TraceType.Pub);
} }
@Test @Test
public void testEncoderFromContextBean() { public void testEncoderFromContextBean() {
TraceContext context = new TraceContext(); TraceContext context = new TraceContext();
...@@ -130,4 +129,107 @@ public class TraceDataEncoderTest { ...@@ -130,4 +129,107 @@ public class TraceDataEncoderTest {
Assert.assertEquals(before.getTransactionState(), after.getTransactionState()); Assert.assertEquals(before.getTransactionState(), after.getTransactionState());
Assert.assertEquals(before.isFromTransactionCheck(), after.isFromTransactionCheck()); Assert.assertEquals(before.isFromTransactionCheck(), after.isFromTransactionCheck());
} }
@Test
public void testPubTraceDataFormatTest() {
TraceContext pubContext = new TraceContext();
pubContext.setTraceType(TraceType.Pub);
pubContext.setTimeStamp(time);
pubContext.setRegionId("Default-region");
pubContext.setGroupName("GroupName-test");
pubContext.setCostTime(34);
pubContext.setSuccess(true);
TraceBean bean = new TraceBean();
bean.setTopic("topic-test");
bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
bean.setTags("tags");
bean.setKeys("keys");
bean.setStoreHost("127.0.0.1:10911");
bean.setBodyLength(100);
bean.setMsgType(MessageType.Normal_Msg);
bean.setOffsetMsgId("AC1415116D1418B4AAC217FE1B4E0000");
pubContext.setTraceBeans(new ArrayList<TraceBean>(1));
pubContext.getTraceBeans().add(bean);
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(pubContext);
String transData = traceTransferBean.getTransData();
Assert.assertNotNull(transData);
String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
Assert.assertEquals(14, items.length);
}
@Test
public void testSubBeforeTraceDataFormatTest() {
TraceContext subBeforeContext = new TraceContext();
subBeforeContext.setTraceType(TraceType.SubBefore);
subBeforeContext.setTimeStamp(time);
subBeforeContext.setRegionId("Default-region");
subBeforeContext.setGroupName("GroupName-test");
subBeforeContext.setRequestId("3455848576927");
TraceBean bean = new TraceBean();
bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
bean.setRetryTimes(0);
bean.setKeys("keys");
subBeforeContext.setTraceBeans(new ArrayList<TraceBean>(1));
subBeforeContext.getTraceBeans().add(bean);
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(subBeforeContext);
String transData = traceTransferBean.getTransData();
Assert.assertNotNull(transData);
String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
Assert.assertEquals(8, items.length);
}
@Test
public void testSubAfterTraceDataFormatTest() {
TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);
subAfterContext.setRequestId("3455848576927");
subAfterContext.setCostTime(20);
subAfterContext.setSuccess(true);
subAfterContext.setContextCode(98623046);
TraceBean bean = new TraceBean();
bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
bean.setKeys("keys");
subAfterContext.setTraceBeans(new ArrayList<TraceBean>(1));
subAfterContext.getTraceBeans().add(bean);
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(subAfterContext);
String transData = traceTransferBean.getTransData();
Assert.assertNotNull(transData);
String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
Assert.assertEquals(7, items.length);
}
@Test
public void testEndTrxTraceDataFormatTest() {
TraceContext endTrxContext = new TraceContext();
endTrxContext.setTraceType(TraceType.EndTransaction);
endTrxContext.setGroupName("PID-test");
endTrxContext.setRegionId("DefaultRegion");
endTrxContext.setTimeStamp(time);
TraceBean endTrxTraceBean = new TraceBean();
endTrxTraceBean.setTopic("topic-test");
endTrxTraceBean.setKeys("Keys");
endTrxTraceBean.setTags("Tags");
endTrxTraceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
endTrxTraceBean.setStoreHost("127.0.0.1:10911");
endTrxTraceBean.setMsgType(MessageType.Trans_msg_Commit);
endTrxTraceBean.setTransactionId("transactionId");
endTrxTraceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
endTrxTraceBean.setFromTransactionCheck(false);
List<TraceBean> traceBeans = new ArrayList<TraceBean>();
traceBeans.add(endTrxTraceBean);
endTrxContext.setTraceBeans(traceBeans);
TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(endTrxContext);
String transData = traceTransferBean.getTransData();
Assert.assertNotNull(transData);
String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
Assert.assertEquals(13, items.length);
}
} }
...@@ -20,6 +20,7 @@ public class NameServerAddressUtils { ...@@ -20,6 +20,7 @@ public class NameServerAddressUtils {
public static final String INSTANCE_PREFIX = "MQ_INST_"; public static final String INSTANCE_PREFIX = "MQ_INST_";
public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+"; public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+";
public static final String ENDPOINT_PREFIX = "(\\w+://|)"; public static final String ENDPOINT_PREFIX = "(\\w+://|)";
public static final Pattern NAMESRV_ENDPOINT_PATTERN = Pattern.compile("^http://.*");
public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*"); public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*");
public static String getNameServerAddresses() { public static String getNameServerAddresses() {
......
# 部署架构和设置步骤
## 集群的设置
### 1 单master模式
这是最简单但也是最危险的模式,一旦broker服务器重启或宕机,整个服务将不可用。 建议在生产环境中不要使用这种部署方式,在本地测试和开发可以选择这种模式。 以下是构建的步骤。
**1)启动NameServer**
```shell
### 第一步启动namesrv
$ nohup sh mqnamesrv &
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
我们可以在namesrv.log 中看到'The Name Server boot success..',表示NameServer 已成功启动。
**2)启动Broker**
```shell
### 第一步先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a,192.169.1.2:10911] boot success...
```
我们可以在 Broker.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。
### 2 多Master模式
该模式是指所有节点都是master主节点(比如2个或3个主节点),没有slave从节点的模式。 这种模式的优缺点如下:
- 优点:
1. 配置简单。
2. 一个master节点的宕机或者重启(维护)对应用程序没有影响。
3. 当磁盘配置为RAID10时,消息不会丢失,因为RAID10磁盘非常可靠,即使机器不可恢复(消息异步刷盘模式的情况下,会丢失少量消息;如果消息是同步刷盘模式,不会丢失任何消息)。
4. 在这种模式下,性能是最高的。
- 缺点:
1. 单台机器宕机时,本机未消费的消息,直到机器恢复后才会订阅,影响消息实时性。
多Master模式的启动步骤如下:
**1)启动 NameServer**
```shell
### 第一步先启动broker
$ nohup sh mqnamesrv &
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
**2)启动 Broker 集群**
```shell
### 比如在A机器上启动第一个Master,假设配置的NameServer IP为:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### 然后在机器B上启动第二个Master,假设配置的NameServer IP是:192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...
```
上面显示的boot命令用于单个NameServer的情况。对于多个NameServer的集群,broker boot命令中-n参数后面的地址列表用分号隔开,例如 192.168.1.1 : 9876; 192.161.2 : 9876
### 3 多Master多Slave模式-异步复制
每个主节点配置多个从节点,多对主从。HA采用异步复制,主节点和从节点之间有短消息延迟(毫秒)。这种模式的优缺点如下:
- 优点:
1. 即使磁盘损坏,也不会丢失极少的消息,不影响消息的实时性能。
2. 同时,当主节点宕机时,消费者仍然可以消费从节点的消息,这个过程对应用本身是透明的,不需要人为干预。
3. 性能几乎与多Master模式一样高。
- 缺点:
1. 主节点宕机、磁盘损坏时,会丢失少量消息。
多主多从模式的启动步骤如下:
**1)启动 NameServer**
```shell
### 第一步先启动broker
$ nohup sh mqnamesrv &
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
**2)启动 Broker 集群**
```shell
### 例如在A机器上启动第一个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 然后在机器B上启动第二个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 然后在C机器上启动第一个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 最后在D机启动第二个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
```
上图显示了 2M-2S-Async 模式的启动命令,类似于其他 nM-nS-Async 模式。
### 4 多Master多Slave模式-同步双写
这种模式下,每个master节点配置多个slave节点,有多对Master-Slave。HA采用同步双写,即只有消息成功写入到主节点并复制到多个从节点,才会返回成功响应给应用程序。
这种模式的优缺点如下:
- 优点:
1. 数据和服务都没有单点故障。
2. 在master节点关闭的情况下,消息也没有延迟。
3. 服务可用性和数据可用性非常高;
- 缺点:
1. 这种模式下的性能略低于异步复制模式(大约低 10%)。
2. 发送单条消息的RT略高,目前版本,master节点宕机后,slave节点无法自动切换到master。
启动步骤如下:
**1)启动NameServer**
```shell
### 第一步启动broker
$ nohup sh mqnamesrv &
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
**2)启动 Broker 集群**
```shell
### 例如在A机器上启动第一个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
### 然后在B机器上启动第二个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
### 然后在C机器上启动第一个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
### 最后在D机启动第二个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
```
上述Master和Slave是通过指定相同的config命名为“brokerName”来配对的,master节点的brokerId必须为0,slave节点的brokerId必须大于0。
\ No newline at end of file
# 批量消息发送
批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。
### 1 发送批量消息
如果你一次只发送不超过 4MiB 的消息,使用批处理很容易:
```java
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
```
### 2 拆分
当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息:
```java
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
return tmpSize;
}
}
// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}
```
\ No newline at end of file
# Schedule example
### 1 启动消费者等待传入的订阅消息
```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// Subscribe topics
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
}
```
### 2 发送延迟消息
```java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
```
### 3 确认
您应该会看到消息在其存储时间后大约 10 秒被消耗。
### 4 延迟消息的使用场景
例如在电子商务中,如果提交订单,可以发送延迟消息,1小时后可以查看订单状态。 如果订单仍未付款,则可以取消订单并释放库存。
### 5 使用延迟消息的限制
```java
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
```
当前 RocketMQ 不支持任意时间的延迟。 生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。
See `SendMessageProcessor.java`
# Basic Sample
------
基本示例中提供了以下两个功能
* RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
* RocketMQ可以用来消费消息。
### 1 添加依赖
maven:
``` java
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
```
gradle:
``` java
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
```
### 2 发送消息
##### 2.1 使用Producer发送同步消息
可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
``` java
public class SyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
// Launch the producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send message to one of brokers
SendResult sendResult = producer.send(msg);
// Check whether the message has been delivered by the callback of sendResult
System.out.printf("%s%n", sendResult);
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
```
##### 2.2 发送异步消息
异步传输通常用于响应时间敏感的业务场景。这意味着发送方无法等待代理的响应太长时间。
``` java
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
// Launch the producer instance
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback: receive the callback of the asynchronous return result.
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
```
##### 2.3 以单向模式发送消息
单向传输用于需要中等可靠性的情况,如日志收集。
``` java
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// Instantiate with a producer group name
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses
producer.setNamesrvAddr("localhost:9876");
// Launch the producer instance
producer.start();
for (int i = 0; i < 100; i++) {
// Create a message instance with specifying topic, tag and message body
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Send in one-way mode, no return result
producer.sendOneway(msg);
}
// Shut down once the producer instance is not longer in use
producer.shutdown();
}
}
```
### 3 消费消息
``` java
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one or more topics and tags for finding those messages need to be consumed
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// Mark the message that have been consumed successfully
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch the consumer instance
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
```
\ No newline at end of file
# 经常被问到的问题
以下是关于RocketMQ项目的常见问题
## 1 基本
1. **为什么我们要使用RocketMQ而不是选择其他的产品?**
请参考[为什么要选择RocketMQ](http://rocketmq.apache.org/docs/motivation/)
2. **我是否需要安装其他的软件才能使用RocketMQ,例如zookeeper?**
不需要,RocketMQ可以独立的运行。
## 2 使用
1. **新创建的Consumer ID从哪里开始消费消息?**
1)如果发送的消息在三天之内,那么消费者会从服务器中保存的第一条消息开始消费。
2)如果发送的消息已经超过三天,则消费者会从服务器中的最新消息开始消费,也就是从队列的尾部开始消费。
3)如果消费者重新启动,那么它会从最后一个消费位置开始消费消息。
2. **当消费失败的时候如何重新消费消息?**
1)在集群模式下,消费的业务逻辑代码会返回Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试16次,之后该消息会被丢弃。
2)在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。
3. **当消费失败的时候如何找到失败的消息?**
1)使用按时间的主题查询,可以查询到一段时间内的消息。
2)使用主题和消息ID来准确查询消息。
3)使用主题和消息的Key来准确查询所有消息Key相同的消息。
4. **消息只会被传递一次吗?**
RocketMQ 确保所有消息至少传递一次。 在大多数情况下,消息不会重复。
5. **如何增加一个新的Broker?**
1)启动一个新的Broker并将其注册到name server中的Broker列表里。
2)默认只自动创建内部系统topic和consumer group。 如果您希望在新节点上拥有您的业务主题和消费者组,请从现有的Broker中复制它们。 我们提供了管理工具和命令行来处理此问题。
## 3 配置相关
以下回答均为默认值,可通过配置修改。
1. **消息在服务器上可以保存多长时间?**
存储的消息将最多保存 3 天,超过 3 天未使用的消息将被删除。
2. **消息体的大小限制是多少?**
通常是256KB
3. **怎么设置消费者线程数?**
当你启动消费者的时候,可以设置 ConsumeThreadNums属性的值,举例如下:
```java
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
```
## 4 错误
1. **当你启动一个生产者或消费者的过程失败了并且错误信息是生产者组或消费者重复**
原因:使用同一个Producer/Consumer Group在同一个JVM中启动多个Producer/Consumer实例可能会导致客户端无法启动。
解决方案:确保一个 Producer/Consumer Group 对应的 JVM 只启动一个 Producer/Consumer 实例。
2. **消费者无法在广播模式下开始加载 json 文件**
原因:fastjson 版本太低,无法让广播消费者加载本地 offsets.json,导致消费者启动失败。 损坏的 fastjson 文件也会导致同样的问题。
解决方案:Fastjson 版本必须升级到 RocketMQ 客户端依赖版本,以确保可以加载本地 offsets.json。 默认情况下,offsets.json 文件在 /home/{user}/.rocketmq_offsets 中。 或者检查fastjson的完整性。
3. **Broker崩溃以后有什么影响?**
1)Master节点崩溃
消息不能再发送到该Broker集群,但是如果您有另一个可用的Broker集群,那么在主题存在的条件下仍然可以发送消息。消息仍然可以从Slave节点消费。
2)一些Slave节点崩溃
只要有另一个工作的slave,就不会影响发送消息。 对消费消息也不会产生影响,除非消费者组设置为优先从该Slave消费。 默认情况下,消费者组从 master 消费。
3)所有Slave节点崩溃
向master发送消息不会有任何影响,但是,如果master是SYNC_MASTER,producer会得到一个SLAVE_NOT_AVAILABLE,表示消息没有发送给任何slave。 对消费消息也没有影响,除非消费者组设置为优先从slave消费。 默认情况下,消费者组从master消费。
4. **Producer提示“No Topic Route Info”,如何诊断?**
当您尝试将消息发送到一个路由信息对生产者不可用的主题时,就会发生这种情况。
1)确保生产者可以连接到名称服务器并且能够从中获取路由元信息。
2)确保名称服务器确实包含主题的路由元信息。 您可以使用管理工具或 Web 控制台通过 topicRoute 从名称服务器查询路由元信息。
3)确保您的Broker将心跳发送到您的生产者正在连接的同一name server列表。
4)确保主题的权限为6(rw-),或至少为2(-w-)。
如果找不到此主题,请通过管理工具命令updateTopic或Web控制台在Broker上创建它。
\ No newline at end of file
...@@ -186,8 +186,8 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 ...@@ -186,8 +186,8 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
| brokerName | null | broker 的名称 | | brokerName | null | broker 的名称 |
| brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 | | brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
| brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave | | brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
| storePathRootDir | $HOME/store/ | 存储根路径 |
| storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 | | storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
| storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |​ | mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |​
| deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |​ | deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |​
| fileReservedTime | 72 | 以小时计算的文件保留时间 |​ | fileReservedTime | 72 | 以小时计算的文件保留时间 |​
......
...@@ -100,6 +100,19 @@ RocketMQ的消息轨迹特性支持两种存储轨迹数据的方式: ...@@ -100,6 +100,19 @@ RocketMQ的消息轨迹特性支持两种存储轨迹数据的方式:
...... ......
``` ```
### 4.4 使用mqadmin命令发送和查看轨迹
- 发送消息
```shell
./mqadmin sendMessage -m true --topic some-topic-name -n 127.0.0.1:9876 -p "your meesgae content"
```
- 查询轨迹
```shell
./mqadmin QueryMsgTraceById -n 127.0.0.1:9876 -i "some-message-id"
```
- 查询轨迹结果
```
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Type #ProducerGroup #ClientHost #SendTime #CostTimes #Status
Pub 1623305799667 xxx.xxx.xxx.xxx 2021-06-10 14:16:40 131ms success
```
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
| brokerName | null | broker name | | brokerName | null | broker name |
| brokerClusterName | DefaultCluster | this broker belongs to which cluster | | brokerClusterName | DefaultCluster | this broker belongs to which cluster |
| brokerId | 0 | broker id, 0 means master, positive integers mean slave | | brokerId | 0 | broker id, 0 means master, positive integers mean slave |
| storePathRootDir | $HOME/store/ | file path for root store |
| storePathCommitLog | $HOME/store/commitlog/ | file path for commit log | | storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
| storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​ | mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​
| deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |​ | deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |​
| fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |​ | fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |​
......
## DefaultMQProducer
---
### Class introduction
`public class DefaultMQProducer
extends ClientConfig
implements MQProducer`
>`DefaultMQProducer` is the entry point for an application to post messages, out of the box,ca quickly create a producer with a no-argument construction. it is mainly responsible for message sending, support synchronous、asynchronous、one-way send. All of these send methods support batch send. The parameters of the sender can be adjusted through the getter/setter methods , provided by this class. `DefaultMQProducer` has multi send method and each method is slightly different. Make sure you know the usage before you use it . Blow is a producer example . [see more examples](https://github.com/apache/rocketmq/blob/master/example/src/main/java/org/apache/rocketmq/example/)。
``` java
public class Producer {
public static void main(String[] args) throws MQClientException {
// create a produce with producer_group_name
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// start the producer
producer.start();
for (int i = 0; i < 128; i++)
try {
// construct the msg
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// send sync
SendResult sendResult = producer.send(msg);
// print the result
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
```
**Note** : This class is thread safe. It can be safely shared between multiple threads after configuration and startup is complete.
### Variable
|Type|Name| description |
|------|-------|-------|
|DefaultMQProducerImpl|defaultMQProducerImpl|The producer's internal default implementation|
|String|producerGroup|The producer's group|
|String|createTopicKey| Topics that do not exist on the server are automatically created when the message is sent |
|int|defaultTopicQueueNums|The default number of queues to create a topic|
|int|sendMsgTimeout|The timeout for the message to be sent|
|int|compressMsgBodyOverHowmuch|the threshold of the compress of message body|
|int|retryTimesWhenSendFailed|Maximum number of internal attempts to send a message in synchronous mode|
|int|retryTimesWhenSendAsyncFailed|Maximum number of internal attempts to send a message in asynchronous mode|
|boolean|retryAnotherBrokerWhenNotStoreOK|Whether to retry another broker if an internal send fails|
|int|maxMessageSize| Maximum length of message |
|TraceDispatcher|traceDispatcher| Message trackers. Use rcpHook to track messages |
### construction method
|方法名称|方法描述|
|-------|------------|
|DefaultMQProducer()| creates a producer with default parameter values |
|DefaultMQProducer(final String producerGroup)| creates a producer with producer group name. |
|DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)|creates a producer with producer group name and set whether to enable message tracking|
|DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)|creates a producer with producer group name and set whether to enable message tracking、the trace topic.|
|DefaultMQProducer(RPCHook rpcHook)|creates a producer with a rpc hook.|
|DefaultMQProducer(final String producerGroup, RPCHook rpcHook)|creates a producer with a rpc hook and producer group.|
|DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)|all of above.|
...@@ -101,5 +101,21 @@ Adjusting instantiation of DefaultMQProducer and DefaultMQPushConsumer as follow ...@@ -101,5 +101,21 @@ Adjusting instantiation of DefaultMQProducer and DefaultMQPushConsumer as follow
``` ```
### 4.4 Send and query message trace by mqadmin command
- send message
```shell
./mqadmin sendMessage -m true --topic some-topic-name -n 127.0.0.1:9876 -p "your meesgae content"
```
- query trace
```shell
./mqadmin QueryMsgTraceById -n 127.0.0.1:9876 -i "some-message-id"
```
- query trace result
```
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Type #ProducerGroup #ClientHost #SendTime #CostTimes #Status
Pub 1623305799667 xxx.xxx.xxx.xxx 2021-06-10 14:16:40 131ms success
```
...@@ -69,6 +69,7 @@ public class TransactionProducer { ...@@ -69,6 +69,7 @@ public class TransactionProducer {
config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis(); config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis();
config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0; config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
config.msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
...@@ -123,8 +124,12 @@ public class TransactionProducer { ...@@ -123,8 +124,12 @@ public class TransactionProducer {
}, 10000, 10000); }, 10000, 10000);
final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
final TransactionMQProducer producer = final TransactionMQProducer producer = new TransactionMQProducer(
new TransactionMQProducer("benchmark_transaction_producer", config.aclEnable ? AclClient.getAclRPCHook() : null); null,
"benchmark_transaction_producer",
config.aclEnable ? AclClient.getAclRPCHook() : null,
config.msgTraceEnable,
null);
producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionCheckListener); producer.setTransactionListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000); producer.setDefaultTopicQueueNums(1000);
...@@ -256,6 +261,10 @@ public class TransactionProducer { ...@@ -256,6 +261,10 @@ public class TransactionProducer {
opt.setRequired(false); opt.setRequired(false);
options.addOption(opt); options.addOption(opt);
opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false");
opt.setRequired(false);
options.addOption(opt);
return options; return options;
} }
} }
...@@ -439,6 +448,7 @@ class TxSendConfig { ...@@ -439,6 +448,7 @@ class TxSendConfig {
long batchId; long batchId;
int sendInterval; int sendInterval;
boolean aclEnable; boolean aclEnable;
boolean msgTraceEnable;
} }
class LRUMap<K, V> extends LinkedHashMap<K, V> { class LRUMap<K, V> extends LinkedHashMap<K, V> {
......
...@@ -65,6 +65,39 @@ public class Producer { ...@@ -65,6 +65,39 @@ public class Producer {
* Call send message to deliver message to one of brokers. * Call send message to deliver message to one of brokers.
*/ */
SendResult sendResult = producer.send(msg); SendResult sendResult = producer.send(msg);
/*
* There are different ways to send message, if you don't care about the send result,you can use this way
* {@code
* producer.sendOneway(msg);
* }
*/
/*
* if you want to get the send result in a synchronize way, you can use this send method
* {@code
* SendResult sendResult = producer.send(msg);
* System.out.printf("%s%n", sendResult);
* }
*/
/*
* if you want to get the send result in a asynchronize way, you can use this send method
* {@code
*
* producer.send(msg, new SendCallback() {
* @Override
* public void onSuccess(SendResult sendResult) {
* // do something
* }
*
* @Override
* public void onException(Throwable e) {
* // do something
* }
*});
*
*}
*/
System.out.printf("%s%n", sendResult); System.out.printf("%s%n", sendResult);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -537,7 +537,7 @@ ...@@ -537,7 +537,7 @@
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
<version>4.0.42.Final</version> <version>4.1.65.Final</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
......
...@@ -52,6 +52,8 @@ public class NettyLogger { ...@@ -52,6 +52,8 @@ public class NettyLogger {
private InternalLogger logger = null; private InternalLogger logger = null;
private static final String EXCEPTION_MESSAGE = "Unexpected exception:";
public NettyBridgeLogger(String name) { public NettyBridgeLogger(String name) {
logger = InternalLoggerFactory.getLogger(name); logger = InternalLoggerFactory.getLogger(name);
} }
...@@ -161,6 +163,25 @@ public class NettyLogger { ...@@ -161,6 +163,25 @@ public class NettyLogger {
} }
} }
@Override
public void log(InternalLogLevel internalLogLevel, Throwable throwable) {
if (internalLogLevel.equals(InternalLogLevel.DEBUG)) {
logger.debug(EXCEPTION_MESSAGE, throwable);
}
if (internalLogLevel.equals(InternalLogLevel.TRACE)) {
logger.info(EXCEPTION_MESSAGE, throwable);
}
if (internalLogLevel.equals(InternalLogLevel.INFO)) {
logger.info(EXCEPTION_MESSAGE, throwable);
}
if (internalLogLevel.equals(InternalLogLevel.WARN)) {
logger.warn(EXCEPTION_MESSAGE, throwable);
}
if (internalLogLevel.equals(InternalLogLevel.ERROR)) {
logger.error(EXCEPTION_MESSAGE, throwable);
}
}
@Override @Override
public boolean isTraceEnabled() { public boolean isTraceEnabled() {
return isEnabled(InternalLogLevel.TRACE); return isEnabled(InternalLogLevel.TRACE);
...@@ -191,6 +212,11 @@ public class NettyLogger { ...@@ -191,6 +212,11 @@ public class NettyLogger {
logger.info(var1, var2); logger.info(var1, var2);
} }
@Override
public void trace(Throwable var1) {
logger.info(EXCEPTION_MESSAGE, var1);
}
@Override @Override
public boolean isDebugEnabled() { public boolean isDebugEnabled() {
return isEnabled(InternalLogLevel.DEBUG); return isEnabled(InternalLogLevel.DEBUG);
...@@ -221,6 +247,11 @@ public class NettyLogger { ...@@ -221,6 +247,11 @@ public class NettyLogger {
logger.debug(var1, var2); logger.debug(var1, var2);
} }
@Override
public void debug(Throwable var1) {
logger.debug(EXCEPTION_MESSAGE, var1);
}
@Override @Override
public boolean isInfoEnabled() { public boolean isInfoEnabled() {
return isEnabled(InternalLogLevel.INFO); return isEnabled(InternalLogLevel.INFO);
...@@ -251,6 +282,11 @@ public class NettyLogger { ...@@ -251,6 +282,11 @@ public class NettyLogger {
logger.info(var1, var2); logger.info(var1, var2);
} }
@Override
public void info(Throwable var1) {
logger.info(EXCEPTION_MESSAGE, var1);
}
@Override @Override
public boolean isWarnEnabled() { public boolean isWarnEnabled() {
return isEnabled(InternalLogLevel.WARN); return isEnabled(InternalLogLevel.WARN);
...@@ -281,6 +317,11 @@ public class NettyLogger { ...@@ -281,6 +317,11 @@ public class NettyLogger {
logger.warn(var1, var2); logger.warn(var1, var2);
} }
@Override
public void warn(Throwable var1) {
logger.warn(EXCEPTION_MESSAGE, var1);
}
@Override @Override
public boolean isErrorEnabled() { public boolean isErrorEnabled() {
return isEnabled(InternalLogLevel.ERROR); return isEnabled(InternalLogLevel.ERROR);
...@@ -310,6 +351,11 @@ public class NettyLogger { ...@@ -310,6 +351,11 @@ public class NettyLogger {
public void error(String var1, Throwable var2) { public void error(String var1, Throwable var2) {
logger.error(var1, var2); logger.error(var1, var2);
} }
@Override
public void error(Throwable var1) {
logger.error(EXCEPTION_MESSAGE, var1);
}
} }
} }
...@@ -553,7 +553,7 @@ public abstract class NettyRemotingAbstract { ...@@ -553,7 +553,7 @@ public abstract class NettyRemotingAbstract {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else { } else {
String info = String.format( String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis, timeoutMillis,
this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits() this.semaphoreOneway.availablePermits()
......
...@@ -19,8 +19,8 @@ package org.apache.rocketmq.store; ...@@ -19,8 +19,8 @@ package org.apache.rocketmq.store;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -1185,7 +1185,7 @@ public class CommitLog { ...@@ -1185,7 +1185,7 @@ public class CommitLog {
this.mappedFileQueue.destroy(); this.mappedFileQueue.destroy();
} }
public boolean appendData(long startOffset, byte[] data) { public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
putMessageLock.lock(); putMessageLock.lock();
try { try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset); MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
...@@ -1194,7 +1194,7 @@ public class CommitLog { ...@@ -1194,7 +1194,7 @@ public class CommitLog {
return false; return false;
} }
return mappedFile.appendMessage(data); return mappedFile.appendMessage(data, dataStart, dataLength);
} finally { } finally {
putMessageLock.unlock(); putMessageLock.unlock();
} }
...@@ -1403,24 +1403,32 @@ public class CommitLog { ...@@ -1403,24 +1403,32 @@ public class CommitLog {
* GroupCommit Service * GroupCommit Service
*/ */
class GroupCommitService extends FlushCommitLogService { class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>(); private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
private final PutMessageSpinLock lock = new PutMessageSpinLock();
public synchronized void putRequest(final GroupCommitRequest request) { public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) { lock.lock();
try {
this.requestsWrite.add(request); this.requestsWrite.add(request);
} finally {
lock.unlock();
} }
this.wakeup(); this.wakeup();
} }
private void swapRequests() { private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite; lock.lock();
try {
LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead; this.requestsWrite = this.requestsRead;
this.requestsRead = tmp; this.requestsRead = tmp;
} finally {
lock.unlock();
}
} }
private void doCommit() { private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) { if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) { for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of // There may be a message in the next file, so a maximum of
...@@ -1439,14 +1447,13 @@ public class CommitLog { ...@@ -1439,14 +1447,13 @@ public class CommitLog {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
} }
this.requestsRead.clear(); this.requestsRead = new LinkedList<>();
} else { } else {
// Because of individual messages is set to not sync flush, it // Because of individual messages is set to not sync flush, it
// will come to this process // will come to this process
CommitLog.this.mappedFileQueue.flush(0); CommitLog.this.mappedFileQueue.flush(0);
} }
} }
}
public void run() { public void run() {
CommitLog.log.info(this.getServiceName() + " service started"); CommitLog.log.info(this.getServiceName() + " service started");
......
...@@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore { ...@@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore {
} }
@Override @Override
public boolean appendToCommitLog(long startOffset, byte[] data) { public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
if (this.shutdown) { if (this.shutdown) {
log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
return false; return false;
} }
boolean result = this.commitLog.appendData(startOffset, data); boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
if (result) { if (result) {
this.reputMessageService.wakeup(); this.reputMessageService.wakeup();
} else { } else {
......
...@@ -301,7 +301,7 @@ public class MappedFile extends ReferenceResource { ...@@ -301,7 +301,7 @@ public class MappedFile extends ReferenceResource {
} }
if (this.isAbleToCommit(commitLeastPages)) { if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) { if (this.hold()) {
commit0(commitLeastPages); commit0();
this.release(); this.release();
} else { } else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
...@@ -317,11 +317,11 @@ public class MappedFile extends ReferenceResource { ...@@ -317,11 +317,11 @@ public class MappedFile extends ReferenceResource {
return this.committedPosition.get(); return this.committedPosition.get();
} }
protected void commit0(final int commitLeastPages) { protected void commit0() {
int writePos = this.wrotePosition.get(); int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get(); int lastCommittedPosition = this.committedPosition.get();
if (writePos - lastCommittedPosition > commitLeastPages) { if (writePos - lastCommittedPosition > 0) {
try { try {
ByteBuffer byteBuffer = writeBuffer.slice(); ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition); byteBuffer.position(lastCommittedPosition);
......
...@@ -245,9 +245,11 @@ public interface MessageStore { ...@@ -245,9 +245,11 @@ public interface MessageStore {
* *
* @param startOffset starting offset. * @param startOffset starting offset.
* @param data data to append. * @param data data to append.
* @param dataStart the start index of data array
* @param dataLength the length of data array
* @return true if success; false otherwise. * @return true if success; false otherwise.
*/ */
boolean appendToCommitLog(final long startOffset, final byte[] data); boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength);
/** /**
* Execute file deletion manually. * Execute file deletion manually.
......
...@@ -48,13 +48,6 @@ public class SelectMappedBufferResult { ...@@ -48,13 +48,6 @@ public class SelectMappedBufferResult {
this.byteBuffer.limit(this.size); this.byteBuffer.limit(this.size);
} }
// @Override
// protected void finalize() {
// if (this.mappedFile != null) {
// this.release();
// }
// }
public synchronized void release() { public synchronized void release() {
if (this.mappedFile != null) { if (this.mappedFile != null) {
this.mappedFile.release(); this.mappedFile.release();
......
...@@ -57,6 +57,7 @@ public class StoreStatsService extends ServiceThread { ...@@ -57,6 +57,7 @@ public class StoreStatsService extends ServiceThread {
private final LinkedList<CallSnapshot> getTimesMissList = new LinkedList<CallSnapshot>(); private final LinkedList<CallSnapshot> getTimesMissList = new LinkedList<CallSnapshot>();
private final LinkedList<CallSnapshot> transferedMsgCountList = new LinkedList<CallSnapshot>(); private final LinkedList<CallSnapshot> transferedMsgCountList = new LinkedList<CallSnapshot>();
private volatile AtomicLong[] putMessageDistributeTime; private volatile AtomicLong[] putMessageDistributeTime;
private volatile AtomicLong[] lastPutMessageDistributeTime;
private long messageStoreBootTimestamp = System.currentTimeMillis(); private long messageStoreBootTimestamp = System.currentTimeMillis();
private volatile long putMessageEntireTimeMax = 0; private volatile long putMessageEntireTimeMax = 0;
private volatile long getMessageEntireTimeMax = 0; private volatile long getMessageEntireTimeMax = 0;
...@@ -80,11 +81,11 @@ public class StoreStatsService extends ServiceThread { ...@@ -80,11 +81,11 @@ public class StoreStatsService extends ServiceThread {
next[i] = new AtomicLong(0); next[i] = new AtomicLong(0);
} }
AtomicLong[] old = this.putMessageDistributeTime; this.lastPutMessageDistributeTime = this.putMessageDistributeTime;
this.putMessageDistributeTime = next; this.putMessageDistributeTime = next;
return old; return lastPutMessageDistributeTime;
} }
public long getPutMessageEntireTimeMax() { public long getPutMessageEntireTimeMax() {
...@@ -298,7 +299,7 @@ public class StoreStatsService extends ServiceThread { ...@@ -298,7 +299,7 @@ public class StoreStatsService extends ServiceThread {
} }
private String putMessageDistributeTimeToString() { private String putMessageDistributeTimeToString() {
final AtomicLong[] times = this.putMessageDistributeTime; final AtomicLong[] times = this.lastPutMessageDistributeTime;
if (null == times) if (null == times)
return null; return null;
......
...@@ -562,11 +562,13 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -562,11 +562,13 @@ public class DLedgerCommitLog extends CommitLog {
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData); request.setBatchMsgs(encodeResult.batchData);
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) { if (appendFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
} }
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;
long wroteOffset = 0; long wroteOffset = 0;
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
...@@ -789,11 +791,13 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -789,11 +791,13 @@ public class DLedgerCommitLog extends CommitLog {
request.setGroup(dLedgerConfig.getGroup()); request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData); request.setBatchMsgs(encodeResult.batchData);
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request); AppendFuture<AppendEntryResponse> appendFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) { if (appendFuture.getPos() == -1) {
log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode()); log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
} }
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) appendFuture;
long wroteOffset = 0; long wroteOffset = 0;
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
...@@ -901,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog { ...@@ -901,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog {
} }
@Override @Override
public boolean appendData(long startOffset, byte[] data) { public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
//the old ha service will invoke method, here to prevent it //the old ha service will invoke method, here to prevent it
return false; return false;
} }
......
...@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey; ...@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
...@@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.PutMessageStatus;
public class HAService { public class HAService {
...@@ -254,12 +254,16 @@ public class HAService { ...@@ -254,12 +254,16 @@ public class HAService {
class GroupTransferService extends ServiceThread { class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject(); private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>(); private final PutMessageSpinLock lock = new PutMessageSpinLock();
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>(); private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) { public void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) { lock.lock();
try {
this.requestsWrite.add(request); this.requestsWrite.add(request);
} finally {
lock.unlock();
} }
this.wakeup(); this.wakeup();
} }
...@@ -269,13 +273,17 @@ public class HAService { ...@@ -269,13 +273,17 @@ public class HAService {
} }
private void swapRequests() { private void swapRequests() {
List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite; lock.lock();
try {
LinkedList<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead; this.requestsWrite = this.requestsRead;
this.requestsRead = tmp; this.requestsRead = tmp;
} finally {
lock.unlock();
}
} }
private void doWaitTransfer() { private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) { if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) { for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
...@@ -293,8 +301,7 @@ public class HAService { ...@@ -293,8 +301,7 @@ public class HAService {
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT); req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
} }
this.requestsRead.clear(); this.requestsRead = new LinkedList<>();
}
} }
} }
...@@ -433,7 +440,6 @@ public class HAService { ...@@ -433,7 +440,6 @@ public class HAService {
private boolean dispatchReadRequest() { private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) { while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition; int diff = this.byteBufferRead.position() - this.dispatchPosition;
...@@ -452,13 +458,12 @@ public class HAService { ...@@ -452,13 +458,12 @@ public class HAService {
} }
if (diff >= (msgHeaderSize + bodySize)) { if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize]; byte[] bodyData = byteBufferRead.array();
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize); int dataStart = this.dispatchPosition + msgHeaderSize;
this.byteBufferRead.get(bodyData);
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize; this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) { if (!reportSlaveMaxOffsetPlus()) {
......
...@@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName; ...@@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class WaitNotifyObject { public class WaitNotifyObject {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable = protected final ConcurrentHashMap<Long/* thread id */, AtomicBoolean/* notified */> waitingThreadTable =
new HashMap<Long, Boolean>(16); new ConcurrentHashMap<Long, AtomicBoolean>(16);
protected volatile boolean hasNotified = false; protected AtomicBoolean hasNotified = new AtomicBoolean(false);
public void wakeup() { public void wakeup() {
boolean needNotify = hasNotified.compareAndSet(false, true);
if (needNotify) {
synchronized (this) { synchronized (this) {
if (!this.hasNotified) {
this.hasNotified = true;
this.notify(); this.notify();
} }
} }
} }
protected void waitForRunning(long interval) { protected void waitForRunning(long interval) {
synchronized (this) { if (this.hasNotified.compareAndSet(true, false)) {
if (this.hasNotified) {
this.hasNotified = false;
this.onWaitEnd(); this.onWaitEnd();
return; return;
} }
synchronized (this) {
try { try {
if (this.hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
this.wait(interval); this.wait(interval);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Interrupted", e); log.error("Interrupted", e);
} finally { } finally {
this.hasNotified = false; this.hasNotified.set(false);
this.onWaitEnd(); this.onWaitEnd();
} }
} }
...@@ -63,15 +66,14 @@ public class WaitNotifyObject { ...@@ -63,15 +66,14 @@ public class WaitNotifyObject {
} }
public void wakeupAll() { public void wakeupAll() {
synchronized (this) {
boolean needNotify = false; boolean needNotify = false;
for (Map.Entry<Long,AtomicBoolean> entry : this.waitingThreadTable.entrySet()) {
for (Map.Entry<Long,Boolean> entry : this.waitingThreadTable.entrySet()) { if (entry.getValue().compareAndSet(false, true)) {
needNotify = needNotify || !entry.getValue(); needNotify = true;
entry.setValue(true); }
} }
if (needNotify) { if (needNotify) {
synchronized (this) {
this.notifyAll(); this.notifyAll();
} }
} }
...@@ -79,20 +81,22 @@ public class WaitNotifyObject { ...@@ -79,20 +81,22 @@ public class WaitNotifyObject {
public void allWaitForRunning(long interval) { public void allWaitForRunning(long interval) {
long currentThreadId = Thread.currentThread().getId(); long currentThreadId = Thread.currentThread().getId();
synchronized (this) { AtomicBoolean notified = this.waitingThreadTable.computeIfAbsent(currentThreadId, k -> new AtomicBoolean(false));
Boolean notified = this.waitingThreadTable.get(currentThreadId); if (notified.compareAndSet(true, false)) {
if (notified != null && notified) {
this.waitingThreadTable.put(currentThreadId, false);
this.onWaitEnd(); this.onWaitEnd();
return; return;
} }
synchronized (this) {
try { try {
if (notified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
this.wait(interval); this.wait(interval);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Interrupted", e); log.error("Interrupted", e);
} finally { } finally {
this.waitingThreadTable.put(currentThreadId, false); notified.set(false);
this.onWaitEnd(); this.onWaitEnd();
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册