提交 510bb34f 编写于 作者: S ShannonDing

Merge branch 'snode' of github.com:apache/rocketmq into snode

...@@ -78,13 +78,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -78,13 +78,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader == null) { if (requestHeader == null) {
return null; return null;
} }
mqtraceContext = buildMsgContext(ctx, requestHeader); mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext); this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response; RemotingCommand response;
// String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); SocketAddress bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost());
SocketAddress bornHost = ctx.channel().remoteAddress();
if (requestHeader.isBatch()) { if (requestHeader.isBatch()) {
response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader); response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader);
...@@ -350,12 +348,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -350,12 +348,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(requestHeader.getBornHost()); msgInner.setBornHost(remoteAddress);
msgInner.setStoreHost(this.getStoreHost()); msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
msgInner.setBornHost(remoteAddress); msgInner.setBornHost(remoteAddress);
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String bornHost = msgInner.getStoreHostBytes(hostHolder).toString();
PutMessageResult putMessageResult = null; PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
...@@ -548,14 +544,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -548,14 +544,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties())); MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody()); messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp()); messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
messageExtBatch.setBornHost(requestHeader.getBornHost()); messageExtBatch.setBornHost(remoteAddress);
messageExtBatch.setStoreHost(this.getStoreHost()); messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String storeHost = messageExtBatch.getStoreHostBytes(hostHolder).toString();
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch); PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost); return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost);
......
package org.apache.rocketmq.common.protocol.body;/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
...@@ -14,7 +14,7 @@ package org.apache.rocketmq.common.protocol.body;/* ...@@ -14,7 +14,7 @@ package org.apache.rocketmq.common.protocol.body;/*
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.common.protocol.route.SnodeData; import org.apache.rocketmq.common.protocol.route.SnodeData;
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
*/ */
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
...@@ -56,9 +55,9 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -56,9 +55,9 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private String enodeName; private String enodeName;
private SocketAddress bornHost; private String bornHost;
private SocketAddress snodeHost; private String snodeHost;
@Override @Override
public void checkFields() throws RemotingCommandException { public void checkFields() throws RemotingCommandException {
...@@ -176,19 +175,19 @@ public class SendMessageRequestHeader implements CommandCustomHeader { ...@@ -176,19 +175,19 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
this.enodeName = enodeName; this.enodeName = enodeName;
} }
public SocketAddress getBornHost() { public String getBornHost() {
return bornHost; return bornHost;
} }
public void setBornHost(SocketAddress bornHost) { public void setBornHost(String bornHost) {
this.bornHost = bornHost; this.bornHost = bornHost;
} }
public SocketAddress getSnodeHost() { public String getSnodeHost() {
return snodeHost; return snodeHost;
} }
public void setSnodeHost(SocketAddress snodeHost) { public void setSnodeHost(String snodeHost) {
this.snodeHost = snodeHost; this.snodeHost = snodeHost;
} }
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.rocketmq.common.protocol.header; package org.apache.rocketmq.common.protocol.header;
import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
...@@ -57,9 +56,9 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -57,9 +56,9 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
private String n; //enode name private String n; //enode name
private SocketAddress o; //born host private String o; //born host
private SocketAddress p; //snode host private String p; //snode host
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader(); SendMessageRequestHeader v1 = new SendMessageRequestHeader();
...@@ -219,19 +218,19 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader { ...@@ -219,19 +218,19 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
this.n = n; this.n = n;
} }
public SocketAddress getO() { public String getO() {
return o; return o;
} }
public void setO(SocketAddress o) { public void setO(String o) {
this.o = o; this.o = o;
} }
public SocketAddress getP() { public String getP() {
return p; return p;
} }
public void setP(SocketAddress p) { public void setP(String p) {
this.p = p; this.p = p;
} }
......
...@@ -153,19 +153,6 @@ public class RemotingUtil { ...@@ -153,19 +153,6 @@ public class RemotingUtil {
return isa; return isa;
} }
public static SocketAddress string2SocketAddressWithIp(final String address) {
String[] s = address.split(":");
try {
String ip = s[0].substring(1);
InetAddress inetAddress = InetAddress.getByName(ip);
InetSocketAddress isa = new InetSocketAddress(inetAddress, Integer.parseInt(s[1]));
return isa;
} catch (Exception e) {
log.error("Failed to obtain address", e);
}
return null;
}
public static String socketAddress2String(final SocketAddress addr) { public static String socketAddress2String(final SocketAddress addr) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr; InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
......
...@@ -49,7 +49,6 @@ public class RemotingCommand { ...@@ -49,7 +49,6 @@ public class RemotingCommand {
private static final int RPC_ONEWAY = 1; private static final int RPC_ONEWAY = 1;
private static volatile int configVersion = -1; private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0); private static AtomicInteger requestId = new AtomicInteger(0);
......
...@@ -29,6 +29,7 @@ import org.apache.commons.cli.CommandLine; ...@@ -29,6 +29,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup; import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig; import org.apache.rocketmq.common.SnodeConfig;
...@@ -54,7 +55,9 @@ public class SnodeStartup { ...@@ -54,7 +55,9 @@ public class SnodeStartup {
public static void main(String[] args) throws IOException, JoranException { public static void main(String[] args) throws IOException, JoranException {
SnodeConfig snodeConfig = loadConfig(args); SnodeConfig snodeConfig = loadConfig(args);
if (snodeConfig.isEmbeddedModeEnable()) { if (snodeConfig.isEmbeddedModeEnable()) {
BrokerStartup.start(BrokerStartup.createBrokerController(args)); BrokerController brokerController = BrokerStartup.createBrokerController(args);
BrokerStartup.start(brokerController);
snodeConfig.setSnodeName(brokerController.getBrokerConfig().getBrokerName());
} }
SnodeController snodeController = createSnodeController(snodeConfig); SnodeController snodeController = createSnodeController(snodeConfig);
startup(snodeController); startup(snodeController);
......
...@@ -75,19 +75,6 @@ public class ConsumerOffsetManager { ...@@ -75,19 +75,6 @@ public class ConsumerOffsetManager {
} }
} }
// private long parserOffset(final RemotingChannel remotingChannel, final String enodeName, final String group,
// final String topic, final int queueId) {
// try {
// RemotingCommand remotingCommand = queryOffset(remotingChannel, enodeName, group, topic, queueId);
// QueryConsumerOffsetResponseHeader responseHeader =
// (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
// return responseHeader.getOffset();
// } catch (Exception ex) {
// log.error("Load offset from broker error", ex);
// }
// return -1;
// }
public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) { public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
String key = buildKey(enodeName, topic, group); String key = buildKey(enodeName, topic, group);
ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key); ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
......
...@@ -28,6 +28,7 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -28,6 +28,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext; import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.RequestContext; import org.apache.rocketmq.remoting.interceptor.RequestContext;
...@@ -74,8 +75,9 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -74,8 +75,9 @@ public class SendMessageProcessor implements RequestProcessor {
request.getCode() == RequestCode.SEND_BATCH_MESSAGE) { request.getCode() == RequestCode.SEND_BATCH_MESSAGE) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN(); enodeName = sendMessageRequestHeaderV2.getN();
sendMessageRequestHeaderV2.setP(remotingChannel.localAddress());
stringBuffer.append(sendMessageRequestHeaderV2.getB()); stringBuffer.append(sendMessageRequestHeaderV2.getB());
request.getExtFields().putIfAbsent("o", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
request.getExtFields().putIfAbsent("p", RemotingHelper.parseChannelRemoteAddr(remotingChannel.localAddress()));
} else { } else {
isSendBack = true; isSendBack = true;
consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
...@@ -85,7 +87,6 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -85,7 +87,6 @@ public class SendMessageProcessor implements RequestProcessor {
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(remotingChannel, enodeName, request); CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(remotingChannel, enodeName, request);
sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
final byte[] message = request.getBody(); final byte[] message = request.getBody();
final boolean needPush = !isSendBack; final boolean needPush = !isSendBack;
final SendMessageRequestHeader sendMessageRequestHeader = final SendMessageRequestHeader sendMessageRequestHeader =
......
...@@ -87,10 +87,11 @@ public class PushServiceImpl implements PushService { ...@@ -87,10 +87,11 @@ public class PushServiceImpl implements PushService {
messageExt.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes()); messageExt.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes());
messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset()); messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset());
messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp()); messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp());
messageExt.setBornHost(sendMessageRequestHeader.getBornHost()); messageExt.setBornHost(RemotingUtil.string2SocketAddress(sendMessageRequestHeader.getBornHost()));
messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost())); messageExt.setStoreHost(RemotingUtil.string2SocketAddress(sendMessageResponseHeader.getStoreHost()));
messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp()); messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp());
messageExt.setWaitStoreMsgOK(false); messageExt.setWaitStoreMsgOK(false);
messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize());
messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag()); messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag());
messageExt.setFlag(sendMessageRequestHeader.getFlag()); messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message); messageExt.setBody(message);
......
...@@ -26,14 +26,14 @@ import java.util.concurrent.TimeUnit; ...@@ -26,14 +26,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch; import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService; import org.apache.rocketmq.store.ha.HAService;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册