提交 3a4e8669 编写于 作者: D duhenglucky

Push MessageExt directly to consumer

上级 41f87a41
...@@ -17,6 +17,9 @@ ...@@ -17,6 +17,9 @@
package org.apache.rocketmq.broker.processor; package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
...@@ -42,8 +45,8 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; ...@@ -42,8 +45,8 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
...@@ -51,10 +54,6 @@ import org.apache.rocketmq.store.PutMessageResult; ...@@ -51,10 +54,6 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements RequestProcessor { public class SendMessageProcessor extends AbstractSendMessageProcessor implements RequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList; private List<ConsumeMessageHook> consumeMessageHookList;
...@@ -443,7 +442,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -443,7 +442,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt); responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
responseHeader.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
responseHeader.setStoreTimestamp(putMessageResult.getAppendMessageResult().getStoreTimestamp());
responseHeader.setStoreSize(putMessageResult.getAppendMessageResult().getWroteBytes());
responseHeader.setStoreHost(ctx.channel().localAddress().toString());
doResponse(ctx, request, response); doResponse(ctx, request, response);
if (hasSendMessageHook()) { if (hasSendMessageHook()) {
......
...@@ -99,6 +99,7 @@ public class SnodeConfig { ...@@ -99,6 +99,7 @@ public class SnodeConfig {
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean enablePropertyFilter = true; private boolean enablePropertyFilter = true;
private int loadOffsetInterval = 3000;
/** /**
* Acl feature switch * Acl feature switch
*/ */
...@@ -400,4 +401,13 @@ public class SnodeConfig { ...@@ -400,4 +401,13 @@ public class SnodeConfig {
public void setMetricsEnable(boolean metricsEnable) { public void setMetricsEnable(boolean metricsEnable) {
this.metricsEnable = metricsEnable; this.metricsEnable = metricsEnable;
} }
public int getLoadOffsetInterval() {
return loadOffsetInterval;
}
public void setLoadOffsetInterval(int loadOffsetInterval) {
this.loadOffsetInterval = loadOffsetInterval;
}
} }
...@@ -181,7 +181,7 @@ public class Message implements Serializable { ...@@ -181,7 +181,7 @@ public class Message implements Serializable {
return properties; return properties;
} }
void setProperties(Map<String, String> properties) { public void setProperties(Map<String, String> properties) {
this.properties = properties; this.properties = properties;
} }
......
...@@ -16,9 +16,6 @@ ...@@ -16,9 +16,6 @@
*/ */
package org.apache.rocketmq.common.message; package org.apache.rocketmq.common.message;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
...@@ -29,6 +26,8 @@ import java.util.ArrayList; ...@@ -29,6 +26,8 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
public class MessageDecoder { public class MessageDecoder {
public final static int MSG_ID_LENGTH = 8 + 8; public final static int MSG_ID_LENGTH = 8 + 8;
...@@ -41,7 +40,7 @@ public class MessageDecoder { ...@@ -41,7 +40,7 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = -626843481; public final static int MESSAGE_MAGIC_CODE = -626843481;
public static final char NAME_VALUE_SEPARATOR = 1; public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2; public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8; public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCODE + 4 // 2 MAGICCODE
+ 4 // 3 BODYCRC + 4 // 3 BODYCRC
......
...@@ -215,13 +215,21 @@ public class MessageExt extends Message { ...@@ -215,13 +215,21 @@ public class MessageExt extends Message {
this.preparedTransactionOffset = preparedTransactionOffset; this.preparedTransactionOffset = preparedTransactionOffset;
} }
@Override @Override public String toString() {
public String toString() { return "MessageExt{" +
return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset "queueId=" + queueId +
+ ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost ", storeSize=" + storeSize +
+ ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId ", queueOffset=" + queueOffset +
+ ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes=" ", sysFlag=" + sysFlag +
+ reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset ", bornTimestamp=" + bornTimestamp +
+ ", toString()=" + super.toString() + "]"; ", bornHost=" + bornHost +
", storeTimestamp=" + storeTimestamp +
", storeHost=" + storeHost +
", msgId='" + msgId + '\'' +
", commitLogOffset=" + commitLogOffset +
", bodyCRC=" + bodyCRC +
", reconsumeTimes=" + reconsumeTimes +
", preparedTransactionOffset=" + preparedTransactionOffset +
'}';
} }
} }
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
...@@ -55,6 +56,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -55,6 +56,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private String enodeName; private String enodeName;
private SocketAddress bornHost;
private SocketAddress snodeHost;
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
} }
...@@ -171,6 +176,22 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -171,6 +176,22 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
this.enodeName = enodeName; this.enodeName = enodeName;
} }
public SocketAddress getBornHost() {
return bornHost;
}
public void setBornHost(SocketAddress bornHost) {
this.bornHost = bornHost;
}
public SocketAddress getSnodeHost() {
return snodeHost;
}
public void setSnodeHost(SocketAddress snodeHost) {
this.snodeHost = snodeHost;
}
@Override public String toString() { @Override public String toString() {
return "SendMessageRequestHeader{" + return "SendMessageRequestHeader{" +
"producerGroup='" + producerGroup + '\'' + "producerGroup='" + producerGroup + '\'' +
...@@ -187,6 +208,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -187,6 +208,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
", batch=" + batch + ", batch=" + batch +
", maxReconsumeTimes=" + maxReconsumeTimes + ", maxReconsumeTimes=" + maxReconsumeTimes +
", enodeName='" + enodeName + '\'' + ", enodeName='" + enodeName + '\'' +
", bornHost=" + bornHost +
", snodeHost=" + snodeHost +
'}'; '}';
} }
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
...@@ -56,6 +57,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -56,6 +57,10 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
private String n; //enode name private String n; //enode name
private SocketAddress o; //born host
private SocketAddress p; //snode host
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader(); SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a); v1.setProducerGroup(v2.a);
...@@ -72,6 +77,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -72,6 +77,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1.setMaxReconsumeTimes(v2.l); v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m); v1.setBatch(v2.m);
v1.setEnodeName(v2.n); v1.setEnodeName(v2.n);
v1.setBornHost(v2.getO());
v1.setSnodeHost(v2.getP());
return v1; return v1;
} }
...@@ -91,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -91,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2.l = v1.getMaxReconsumeTimes(); v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch(); v2.m = v1.isBatch();
v2.n = v1.getEnodeName(); v2.n = v1.getEnodeName();
v2.o = v1.getBornHost();
v2.p = v1.getSnodeHost();
return v2; return v2;
} }
...@@ -210,6 +219,22 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -210,6 +219,22 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
this.n = n; this.n = n;
} }
public SocketAddress getO() {
return o;
}
public void setO(SocketAddress o) {
this.o = o;
}
public SocketAddress getP() {
return p;
}
public void setP(SocketAddress p) {
this.p = p;
}
@Override public String toString() { @Override public String toString() {
return "SendMessageRequestHeaderV2{" + return "SendMessageRequestHeaderV2{" +
"a='" + a + '\'' + "a='" + a + '\'' +
...@@ -226,6 +251,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -226,6 +251,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
", l=" + l + ", l=" + l +
", m=" + m + ", m=" + m +
", n='" + n + '\'' + ", n='" + n + '\'' +
", o=" + o +
", p=" + p +
'}'; '}';
} }
} }
\ No newline at end of file
...@@ -31,8 +31,17 @@ public class SendMessageResponseHeader implements CommandCustomHeader { ...@@ -31,8 +31,17 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
private Integer queueId; private Integer queueId;
@CFNotNull @CFNotNull
private Long queueOffset; private Long queueOffset;
private String transactionId; private String transactionId;
private long storeTimestamp;
private String storeHost;
private long commitLogOffset;
private int storeSize;
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
} }
...@@ -68,4 +77,49 @@ public class SendMessageResponseHeader implements CommandCustomHeader { ...@@ -68,4 +77,49 @@ public class SendMessageResponseHeader implements CommandCustomHeader {
public void setTransactionId(String transactionId) { public void setTransactionId(String transactionId) {
this.transactionId = transactionId; this.transactionId = transactionId;
} }
public long getStoreTimestamp() {
return storeTimestamp;
}
public void setStoreTimestamp(long storeTimestamp) {
this.storeTimestamp = storeTimestamp;
}
public long getCommitLogOffset() {
return commitLogOffset;
}
public void setCommitLogOffset(long commitLogOffset) {
this.commitLogOffset = commitLogOffset;
}
public int getStoreSize() {
return storeSize;
}
public void setStoreSize(int storeSize) {
this.storeSize = storeSize;
}
public String getStoreHost() {
return storeHost;
}
public void setStoreHost(String storeHost) {
this.storeHost = storeHost;
}
@Override public String toString() {
return "SendMessageResponseHeader{" +
"msgId='" + msgId + '\'' +
", queueId=" + queueId +
", queueOffset=" + queueOffset +
", transactionId='" + transactionId + '\'' +
", storeTimestamp=" + storeTimestamp +
", storeHost=" + storeHost +
", commitLogOffset=" + commitLogOffset +
", storeSize=" + storeSize +
'}';
}
} }
...@@ -61,7 +61,7 @@ public class Consumer { ...@@ -61,7 +61,7 @@ public class Consumer {
/* /*
* Register callback to execute on arrival of messages fetched from brokers. * Register callback to execute on arrival of messages fetched from brokers.
*/ */
consumer.setNamesrvAddr("47.102.149.193:9876"); // consumer.setNamesrvAddr("47.102.149.193:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
......
...@@ -31,7 +31,6 @@ import java.nio.channels.SocketChannel; ...@@ -31,7 +31,6 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
...@@ -154,6 +153,19 @@ public class RemotingUtil { ...@@ -154,6 +153,19 @@ public class RemotingUtil {
return isa; return isa;
} }
public static SocketAddress string2SocketAddressWithIp(final String address) {
String[] s = address.split(":");
try {
String ip = s[0].substring(1);
InetAddress inetAddress = InetAddress.getByName(ip);
InetSocketAddress isa = new InetSocketAddress(inetAddress, Integer.parseInt(s[1]));
return isa;
} catch (Exception e) {
log.error("Failed to obtain address", e);
}
return null;
}
public static String socketAddress2String(final SocketAddress addr) { public static String socketAddress2String(final SocketAddress addr) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr; InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
......
...@@ -171,14 +171,6 @@ public class SnodeController { ...@@ -171,14 +171,6 @@ public class SnodeController {
"SnodeHeartbeatThread", "SnodeHeartbeatThread",
true); true);
// this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
// snodeConfig.getSnodeSendMessageMinPoolSize(),
// snodeConfig.getSnodeSendMessageMaxPoolSize(),
// 3000,
// TimeUnit.MILLISECONDS,
// new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
// "SnodePullMessageThread",
// false);
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor( this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(), snodeConfig.getSnodeSendMessageMinPoolSize(),
......
...@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { ...@@ -56,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet); Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet; clientSet = prev != null ? prev : clientSet;
} }
log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel); clientSet.add(remotingChannel);
} }
} }
......
...@@ -16,16 +16,13 @@ ...@@ -16,16 +16,13 @@
*/ */
package org.apache.rocketmq.snode.offset; package org.apache.rocketmq.snode.offset;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
...@@ -37,10 +34,10 @@ public class ConsumerOffsetManager { ...@@ -37,10 +34,10 @@ public class ConsumerOffsetManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@"; private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, Long>> offsetTable = private ConcurrentMap<String/* Enode@Topic@Group */, ConcurrentMap<Integer, CacheOffset>> offsetTable =
new ConcurrentHashMap<>(512); new ConcurrentHashMap<>(512);
private transient SnodeController snodeController; private SnodeController snodeController;
public ConsumerOffsetManager(SnodeController brokerController) { public ConsumerOffsetManager(SnodeController brokerController) {
this.snodeController = brokerController; this.snodeController = brokerController;
...@@ -57,74 +54,63 @@ public class ConsumerOffsetManager { ...@@ -57,74 +54,63 @@ public class ConsumerOffsetManager {
return sb.toString(); return sb.toString();
} }
private boolean offsetBehindMuchThanData(final String enodeName, final String topic,
ConcurrentMap<Integer, Long> table) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
boolean result = !table.isEmpty();
while (it.hasNext() && result) {
Entry<Integer, Long> next = it.next();
RemotingCommand remotingCommand = this.snodeController.getEnodeService().getMinOffsetInQueue(enodeName, topic, next.getKey());
long minOffsetInStore = 0;
if (remotingCommand != null) {
switch (remotingCommand.getCode()) {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
minOffsetInStore = responseHeader.getOffset();
}
default:
break;
}
} else {
throw new SnodeException(ResponseCode.QUERY_OFFSET_ERROR, "Query min offset error!");
}
long offsetInPersist = next.getValue();
result = offsetInPersist <= minOffsetInStore;
}
return result;
}
public void cacheOffset(final String enodeName, final String clientHost, final String group, final String topic, public void cacheOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId, final int queueId,
final long offset) { final long offset) {
// Topic@group // EnodeName@Topic@group
String key = buildKey(enodeName, topic, group); String key = buildKey(enodeName, topic, group);
this.commitOffset(clientHost, key, queueId, offset); this.commitOffset(clientHost, key, queueId, offset);
} }
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
CacheOffset cacheOffset = new CacheOffset(key, offset, System.currentTimeMillis());
if (null == map) { if (null == map) {
map = new ConcurrentHashMap<>(32); map = new ConcurrentHashMap<>(32);
ConcurrentMap<Integer, Long> prev = this.offsetTable.putIfAbsent(key, map); ConcurrentMap<Integer, CacheOffset> prev = this.offsetTable.putIfAbsent(key, map);
map = prev != null ? prev : map; map = prev != null ? prev : map;
map.put(queueId, offset); map.put(queueId, cacheOffset);
} else { } else {
Long storeOffset = map.put(queueId, offset); CacheOffset storeOffset = map.put(queueId, cacheOffset);
if (storeOffset != null && offset < storeOffset) { if (storeOffset != null && offset < storeOffset.getOffset()) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}", clientHost, key, queueId, offset, storeOffset); log.warn("[NOTIFYME]update consumer offset less than store. clientHost: {}, key: {}, queueId: {}, requestOffset: {}, storeOffset: {}",
clientHost, key, queueId, offset, storeOffset);
} }
} }
} }
public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) { private long parserOffset(final String enodeName, final String group, final String topic, final int queueId) {
String key = buildKey(enodeName, topic, group); try {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); RemotingCommand remotingCommand = queryOffset(enodeName, group, topic, queueId);
if (null != map) { QueryConsumerOffsetResponseHeader responseHeader =
Long offset = map.get(queueId); (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
if (offset != null) return responseHeader.getOffset();
return offset; } catch (Exception ex) {
log.error("Load offset from broker error", ex);
} }
return -1; return -1;
} }
public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() { public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
return offsetTable; String key = buildKey(enodeName, topic, group);
} ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
if (map == null) {
map = new ConcurrentHashMap<>();
map = this.offsetTable.putIfAbsent(key, map);
}
CacheOffset cacheOffset = map.get(queueId);
if (cacheOffset != null) {
if (System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > snodeController.getSnodeConfig().getLoadOffsetInterval()) {
cacheOffset.setOffset(parserOffset(enodeName, group, topic, queueId));
cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
}
} else {
cacheOffset = new CacheOffset(key, parserOffset(enodeName, group, topic, queueId), System.currentTimeMillis());
map.put(queueId, cacheOffset);
}
return cacheOffset.getOffset();
}
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic, public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId, final int queueId,
...@@ -139,4 +125,39 @@ public class ConsumerOffsetManager { ...@@ -139,4 +125,39 @@ public class ConsumerOffsetManager {
return this.snodeController.getEnodeService().loadOffset(enodeName, group, topic, queueId); return this.snodeController.getEnodeService().loadOffset(enodeName, group, topic, queueId);
} }
public class CacheOffset {
private String key;
private long offset;
private long updateTimestamp;
public CacheOffset(final String key, final long offset, final long updateTimestamp) {
this.key = key;
this.offset = offset;
this.updateTimestamp = updateTimestamp;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public long getUpdateTimestamp() {
return updateTimestamp;
}
public void setUpdateTimestamp(long updateTimestamp) {
this.updateTimestamp = updateTimestamp;
}
}
} }
...@@ -71,6 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor { ...@@ -71,6 +71,7 @@ public class HeartbeatProcessor implements RequestProcessor {
private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) { private RemotingCommand register(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
log.info("heartbeatData: {}", heartbeatData);
Channel channel = null; Channel channel = null;
Attribute<Client> clientAttribute = null; Attribute<Client> clientAttribute = null;
if (remotingChannel instanceof NettyChannelHandlerContextImpl) { if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
......
...@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.constant.LoggerName; ...@@ -22,6 +22,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
...@@ -73,6 +74,7 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -73,6 +74,7 @@ public class SendMessageProcessor implements RequestProcessor {
request.getCode() == RequestCode.SEND_BATCH_MESSAGE) { request.getCode() == RequestCode.SEND_BATCH_MESSAGE) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN(); enodeName = sendMessageRequestHeaderV2.getN();
sendMessageRequestHeaderV2.setP(remotingChannel.localAddress());
stringBuffer.append(sendMessageRequestHeaderV2.getB()); stringBuffer.append(sendMessageRequestHeaderV2.getB());
} else { } else {
isSendBack = true; isSendBack = true;
...@@ -82,9 +84,12 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -82,9 +84,12 @@ public class SendMessageProcessor implements RequestProcessor {
} }
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request); CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
final Integer queueId = sendMessageRequestHeaderV2.getE();
sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
final byte[] message = request.getBody(); final byte[] message = request.getBody();
final boolean isNeedPush = !isSendBack; final boolean isNeedPush = !isSendBack;
final SendMessageRequestHeader sendMessageRequestHeader =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2);
responseFuture.whenComplete((data, ex) -> { responseFuture.whenComplete((data, ex) -> {
if (ex == null) { if (ex == null) {
if (this.snodeController.getSendMessageInterceptorGroup() != null) { if (this.snodeController.getSendMessageInterceptorGroup() != null) {
...@@ -94,7 +99,8 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -94,7 +99,8 @@ public class SendMessageProcessor implements RequestProcessor {
remotingChannel.reply(data); remotingChannel.reply(data);
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length); this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
this.snodeController.getPushService().pushMessage(enodeName, stringBuffer.toString(), queueId, message, data); log.info("Send message response: {}", data);
this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, message, data);
} }
} else { } else {
this.snodeController.getMetricsService().incRequestCount(request.getCode(), false); this.snodeController.getMetricsService().incRequestCount(request.getCode(), false);
......
...@@ -16,19 +16,20 @@ ...@@ -16,19 +16,20 @@
*/ */
package org.apache.rocketmq.snode.service; package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface PushService { public interface PushService {
/** /**
* TODO how to resolve the slow consumer: close or ignore? * Push message to consumer which subscribed target {@link MessageQueue}
* <p>
* *
* @param enodeName * @param requestHeader Send message request header
* @param topic * @param message Message body
* @param queueId * @param response Send message response
* @param message
* @param response
*/ */
void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message, void pushMessage(final SendMessageRequestHeader requestHeader, final byte[] message,
final RemotingCommand response); final RemotingCommand response);
void shutdown(); void shutdown();
......
...@@ -23,15 +23,20 @@ import java.util.concurrent.ArrayBlockingQueue; ...@@ -23,15 +23,20 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.PushMessageHeader; import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.snode.SnodeController;
...@@ -61,34 +66,58 @@ public class PushServiceImpl implements PushService { ...@@ -61,34 +66,58 @@ public class PushServiceImpl implements PushService {
public class PushTask implements Runnable { public class PushTask implements Runnable {
private AtomicBoolean canceled = new AtomicBoolean(false); private AtomicBoolean canceled = new AtomicBoolean(false);
private final byte[] message; private final byte[] message;
private final Integer queueId; private final RemotingCommand sendMessageResponse;
private final String topic; private final SendMessageRequestHeader sendMessageRequestHeader;
private final RemotingCommand response;
private final String enodeName;
public PushTask(final String topic, final Integer queueId, final byte[] message, public PushTask(final SendMessageRequestHeader sendMessageRequestHeader, final byte[] message,
final RemotingCommand response, final String enodeName) { final RemotingCommand response) {
this.message = message; this.message = message;
this.queueId = queueId; this.sendMessageRequestHeader = sendMessageRequestHeader;
this.topic = topic; this.sendMessageResponse = response;
this.response = response; }
this.enodeName = enodeName;
private MessageExt buildMessageExt(final SendMessageResponseHeader sendMessageResponseHeader,
final byte[] message, final SendMessageRequestHeader sendMessageRequestHeader) {
MessageExt messageExt = new MessageExt();
messageExt.setProperties(MessageDecoder.string2messageProperties(sendMessageRequestHeader.getProperties()));
messageExt.setTopic(sendMessageRequestHeader.getTopic());
messageExt.setMsgId(sendMessageResponseHeader.getMsgId());
messageExt.setQueueId(sendMessageResponseHeader.getQueueId());
messageExt.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
messageExt.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes());
messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset());
messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp());
messageExt.setBornHost(sendMessageRequestHeader.getBornHost());
// messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize());
messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost()));
messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp());
messageExt.setWaitStoreMsgOK(false);
messageExt.setSnodeAddress(sendMessageRequestHeader.getSnodeHost());
messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag());
messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message);
messageExt.setBodyCRC(UtilAll.crc32(message));
log.info("MessageExt:{}", messageExt);
return messageExt;
} }
@Override @Override
public void run() { public void run() {
if (!canceled.get()) { if (!canceled.get()) {
try { try {
SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); log.info("sendMessageResponse:{}", sendMessageResponse);
PushMessageHeader pushMessageHeader = new PushMessageHeader(); SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset()); log.info("sendMessageResponseHeader:{}", sendMessageResponseHeader);
pushMessageHeader.setTopic(topic); MessageQueue messageQueue = new MessageQueue(sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
pushMessageHeader.setQueueId(queueId);
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
pushMessage.setBody(message);
MessageQueue messageQueue = new MessageQueue(topic, enodeName, queueId);
Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue); Set<RemotingChannel> consumerTable = snodeController.getSubscriptionManager().getPushableChannel(messageQueue);
if (consumerTable != null) { if (consumerTable != null) {
PushMessageHeader pushMessageHeader = new PushMessageHeader();
pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
pushMessageHeader.setTopic(sendMessageRequestHeader.getTopic());
pushMessageHeader.setQueueId(sendMessageResponseHeader.getQueueId());
RemotingCommand pushMessage = RemotingCommand.createRequestCommand(RequestCode.SNODE_PUSH_MESSAGE, pushMessageHeader);
MessageExt messageExt = buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
pushMessage.setBody(MessageDecoder.encode(messageExt, false));
for (RemotingChannel remotingChannel : consumerTable) { for (RemotingChannel remotingChannel : consumerTable) {
Client client = null; Client client = null;
if (remotingChannel instanceof NettyChannelImpl) { if (remotingChannel instanceof NettyChannelImpl) {
...@@ -101,13 +130,14 @@ public class PushServiceImpl implements PushService { ...@@ -101,13 +130,14 @@ public class PushServiceImpl implements PushService {
if (client != null) { if (client != null) {
for (String consumerGroup : client.getGroups()) { for (String consumerGroup : client.getGroups()) {
Subscription subscription = snodeController.getSubscriptionManager().getSubscription(consumerGroup); Subscription subscription = snodeController.getSubscriptionManager().getSubscription(consumerGroup);
if (subscription.getSubscriptionData(topic) != null) { if (subscription.getSubscriptionData(sendMessageRequestHeader.getTopic()) != null) {
boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName); boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId(), consumerGroup, sendMessageRequestHeader.getEnodeName());
if (slowConsumer) { if (slowConsumer) {
log.warn("[SlowConsumer]: {} is slow consumer", remotingChannel); log.warn("[SlowConsumer]: {} is slow consumer", remotingChannel);
snodeController.getSlowConsumerService().slowConsumerResolve(pushMessage, remotingChannel); snodeController.getSlowConsumerService().slowConsumerResolve(pushMessage, remotingChannel);
continue; continue;
} }
pushMessageHeader.setConsumerGroup(consumerGroup);
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS); snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
} }
} }
...@@ -119,10 +149,10 @@ public class PushServiceImpl implements PushService { ...@@ -119,10 +149,10 @@ public class PushServiceImpl implements PushService {
log.info("No online registered as push consumer and online for messageQueue: {} ", messageQueue); log.info("No online registered as push consumer and online for messageQueue: {} ", messageQueue);
} }
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex); log.warn("Push message to topic: {} queueId: {}", sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId(), ex);
} }
} else { } else {
log.info("Push message to topic: {} queueId: {} canceled!", topic, queueId); log.info("Push message to topic: {} queueId: {} canceled!", sendMessageRequestHeader.getTopic(), sendMessageRequestHeader.getQueueId());
} }
} }
...@@ -133,9 +163,9 @@ public class PushServiceImpl implements PushService { ...@@ -133,9 +163,9 @@ public class PushServiceImpl implements PushService {
} }
@Override @Override
public void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message, public void pushMessage(final SendMessageRequestHeader requestHeader, final byte[] message,
final RemotingCommand response) { final RemotingCommand response) {
PushTask pushTask = new PushTask(topic, queueId, message, response, enodeName); PushTask pushTask = new PushTask(requestHeader, message, response);
pushMessageExecutorService.submit(pushTask); pushMessageExecutorService.submit(pushTask);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册