diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 1408cce39b8dc9f794483bfad4eacf2dcee28da7..e5bd8e7f84fec2e66929e988d059651f31e7eb23 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -78,13 +78,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (requestHeader == null) { return null; } - mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; -// String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); - SocketAddress bornHost = ctx.channel().remoteAddress(); + SocketAddress bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost()); if (requestHeader.isBatch()) { response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader); @@ -350,12 +348,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); - msgInner.setBornHost(requestHeader.getBornHost()); + msgInner.setBornHost(remoteAddress); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); msgInner.setBornHost(remoteAddress); -// ByteBuffer hostHolder = ByteBuffer.allocate(8); -// String bornHost = msgInner.getStoreHostBytes(hostHolder).toString(); PutMessageResult putMessageResult = null; Map oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); @@ -548,14 +544,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties())); messageExtBatch.setBody(request.getBody()); messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); - messageExtBatch.setBornHost(requestHeader.getBornHost()); + messageExtBatch.setBornHost(remoteAddress); messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); -// ByteBuffer hostHolder = ByteBuffer.allocate(8); -// String storeHost = messageExtBatch.getStoreHostBytes(hostHolder).toString(); - PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java index 2d42bdf770d280b6453b21df8bb234aa827e2ff9..55999afd001b4793eb7ad7facb2d378ff00b54ae 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.common.protocol.body;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,7 +14,7 @@ package org.apache.rocketmq.common.protocol.body;/* * See the License for the specific language governing permissions and * limitations under the License. */ - +package org.apache.rocketmq.common.protocol.body; import java.util.HashMap; import java.util.Set; import org.apache.rocketmq.common.protocol.route.SnodeData; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java index 84ab10db8f42ce201f05af827205ee9a089252ab..2577bf55a38818216a6ea80fe98554d0928fc854 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java @@ -20,7 +20,6 @@ */ package org.apache.rocketmq.common.protocol.header; -import java.net.SocketAddress; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; @@ -56,9 +55,9 @@ public class SendMessageRequestHeader implements CommandCustomHeader { private String enodeName; - private SocketAddress bornHost; + private String bornHost; - private SocketAddress snodeHost; + private String snodeHost; @Override public void checkFields() throws RemotingCommandException { @@ -176,19 +175,19 @@ public class SendMessageRequestHeader implements CommandCustomHeader { this.enodeName = enodeName; } - public SocketAddress getBornHost() { + public String getBornHost() { return bornHost; } - public void setBornHost(SocketAddress bornHost) { + public void setBornHost(String bornHost) { this.bornHost = bornHost; } - public SocketAddress getSnodeHost() { + public String getSnodeHost() { return snodeHost; } - public void setSnodeHost(SocketAddress snodeHost) { + public void setSnodeHost(String snodeHost) { this.snodeHost = snodeHost; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index 3260c258257d2e2ea240d677518dfcf68ee07d08..048d20115591a82bd413b0b3a560bfeff89ddbe4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.common.protocol.header; -import java.net.SocketAddress; import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; @@ -57,9 +56,9 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { private String n; //enode name - private SocketAddress o; //born host + private String o; //born host - private SocketAddress p; //snode host + private String p; //snode host public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { SendMessageRequestHeader v1 = new SendMessageRequestHeader(); @@ -219,19 +218,19 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { this.n = n; } - public SocketAddress getO() { + public String getO() { return o; } - public void setO(SocketAddress o) { + public void setO(String o) { this.o = o; } - public SocketAddress getP() { + public String getP() { return p; } - public void setP(SocketAddress p) { + public void setP(String p) { this.p = p; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index a118c409eb0aa6dba161c5ffa441ee63f0f13e26..4e592cb8f48026bb4775b495a36f7ae7faeccd0d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -153,19 +153,6 @@ public class RemotingUtil { return isa; } - public static SocketAddress string2SocketAddressWithIp(final String address) { - String[] s = address.split(":"); - try { - String ip = s[0].substring(1); - InetAddress inetAddress = InetAddress.getByName(ip); - InetSocketAddress isa = new InetSocketAddress(inetAddress, Integer.parseInt(s[1])); - return isa; - } catch (Exception e) { - log.error("Failed to obtain address", e); - } - return null; - } - public static String socketAddress2String(final SocketAddress addr) { StringBuilder sb = new StringBuilder(); InetSocketAddress inetSocketAddress = (InetSocketAddress) addr; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index d392e79ad7eba9aac9803eab82fa2bf187861105..e823a69c767d1ec8646cc769b65e97e1f5f13dc9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -49,7 +49,6 @@ public class RemotingCommand { private static final int RPC_ONEWAY = 1; - private static volatile int configVersion = -1; private static AtomicInteger requestId = new AtomicInteger(0); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java index 23ea68a18355afee476fb786376b97a1d3b8a7da..70797711d8d9e11dd3ab32a03b3f33f7a1efe088 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java @@ -29,6 +29,7 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.SnodeConfig; @@ -54,7 +55,9 @@ public class SnodeStartup { public static void main(String[] args) throws IOException, JoranException { SnodeConfig snodeConfig = loadConfig(args); if (snodeConfig.isEmbeddedModeEnable()) { - BrokerStartup.start(BrokerStartup.createBrokerController(args)); + BrokerController brokerController = BrokerStartup.createBrokerController(args); + BrokerStartup.start(brokerController); + snodeConfig.setSnodeName(brokerController.getBrokerConfig().getBrokerName()); } SnodeController snodeController = createSnodeController(snodeConfig); startup(snodeController); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java index ba2a80011b20003e9ad8e64f1ea9f714ec176f1a..2cb6e6998d5665def5f10f2e6d74cc8b4335bd95 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java @@ -75,19 +75,6 @@ public class ConsumerOffsetManager { } } -// private long parserOffset(final RemotingChannel remotingChannel, final String enodeName, final String group, -// final String topic, final int queueId) { -// try { -// RemotingCommand remotingCommand = queryOffset(remotingChannel, enodeName, group, topic, queueId); -// QueryConsumerOffsetResponseHeader responseHeader = -// (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); -// return responseHeader.getOffset(); -// } catch (Exception ex) { -// log.error("Load offset from broker error", ex); -// } -// return -1; -// } - public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) { String key = buildKey(enodeName, topic, group); ConcurrentMap map = this.offsetTable.get(key); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index 019794ed87e3deacb26c07afd11556685b80ecf5..dda5774f6de277020cb04f932ed17ad3a7064df8 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -28,6 +28,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.interceptor.ExceptionContext; import org.apache.rocketmq.remoting.interceptor.RequestContext; @@ -74,8 +75,9 @@ public class SendMessageProcessor implements RequestProcessor { request.getCode() == RequestCode.SEND_BATCH_MESSAGE) { sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); enodeName = sendMessageRequestHeaderV2.getN(); - sendMessageRequestHeaderV2.setP(remotingChannel.localAddress()); stringBuffer.append(sendMessageRequestHeaderV2.getB()); + request.getExtFields().putIfAbsent("o", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())); + request.getExtFields().putIfAbsent("p", RemotingHelper.parseChannelRemoteAddr(remotingChannel.localAddress())); } else { isSendBack = true; consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); @@ -85,7 +87,6 @@ public class SendMessageProcessor implements RequestProcessor { CompletableFuture responseFuture = snodeController.getEnodeService().sendMessage(remotingChannel, enodeName, request); - sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress()); final byte[] message = request.getBody(); final boolean needPush = !isSendBack; final SendMessageRequestHeader sendMessageRequestHeader = diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 9f4b016f1346a48630733cafcc800cb981e45fad..24a5a9c290b83e983e3e82f7df6b7af3fc0c390d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -87,10 +87,11 @@ public class PushServiceImpl implements PushService { messageExt.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes()); messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset()); messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp()); - messageExt.setBornHost(sendMessageRequestHeader.getBornHost()); - messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost())); + messageExt.setBornHost(RemotingUtil.string2SocketAddress(sendMessageRequestHeader.getBornHost())); + messageExt.setStoreHost(RemotingUtil.string2SocketAddress(sendMessageResponseHeader.getStoreHost())); messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp()); messageExt.setWaitStoreMsgOK(false); + messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize()); messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag()); messageExt.setFlag(sendMessageRequestHeader.getFlag()); messageExt.setBody(message); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index cbcc1a7b728d1be62a43db41973a77090724abc2..b02d36ed1a49461a4c71d09e62bae47f5a7f9d7d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -26,14 +26,14 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.ha.HAService;