提交 ff6b6871 编写于 作者: D duhenglucky

Fix the issue of storesize and bornhost inaccurate in push message

上级 14a75e5c
......@@ -78,13 +78,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
// String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
SocketAddress bornHost = ctx.channel().remoteAddress();
SocketAddress bornHost = RemotingHelper.string2SocketAddress(requestHeader.getBornHost());
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(bornHost, request, mqtraceContext, requestHeader);
......@@ -350,12 +348,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(requestHeader.getBornHost());
msgInner.setBornHost(remoteAddress);
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
msgInner.setBornHost(remoteAddress);
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String bornHost = msgInner.getStoreHostBytes(hostHolder).toString();
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
......@@ -548,14 +544,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor.setProperties(messageExtBatch, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
messageExtBatch.setBornHost(requestHeader.getBornHost());
messageExtBatch.setBornHost(remoteAddress);
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String storeHost = messageExtBatch.getStoreHostBytes(hostHolder).toString();
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
return handlePutMessageResult(putMessageResult, response, request, messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost);
......
package org.apache.rocketmq.common.protocol.body;/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -14,7 +14,7 @@ package org.apache.rocketmq.common.protocol.body;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap;
import java.util.Set;
import org.apache.rocketmq.common.protocol.route.SnodeData;
......
......@@ -20,7 +20,6 @@
*/
package org.apache.rocketmq.common.protocol.header;
import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
......@@ -56,9 +55,9 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private String enodeName;
private SocketAddress bornHost;
private String bornHost;
private SocketAddress snodeHost;
private String snodeHost;
@Override
public void checkFields() throws RemotingCommandException {
......@@ -176,19 +175,19 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
this.enodeName = enodeName;
}
public SocketAddress getBornHost() {
public String getBornHost() {
return bornHost;
}
public void setBornHost(SocketAddress bornHost) {
public void setBornHost(String bornHost) {
this.bornHost = bornHost;
}
public SocketAddress getSnodeHost() {
public String getSnodeHost() {
return snodeHost;
}
public void setSnodeHost(SocketAddress snodeHost) {
public void setSnodeHost(String snodeHost) {
this.snodeHost = snodeHost;
}
......
......@@ -17,7 +17,6 @@
package org.apache.rocketmq.common.protocol.header;
import java.net.SocketAddress;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
......@@ -57,9 +56,9 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
private String n; //enode name
private SocketAddress o; //born host
private String o; //born host
private SocketAddress p; //snode host
private String p; //snode host
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
......@@ -219,19 +218,19 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
this.n = n;
}
public SocketAddress getO() {
public String getO() {
return o;
}
public void setO(SocketAddress o) {
public void setO(String o) {
this.o = o;
}
public SocketAddress getP() {
public String getP() {
return p;
}
public void setP(SocketAddress p) {
public void setP(String p) {
this.p = p;
}
......
......@@ -153,19 +153,6 @@ public class RemotingUtil {
return isa;
}
public static SocketAddress string2SocketAddressWithIp(final String address) {
String[] s = address.split(":");
try {
String ip = s[0].substring(1);
InetAddress inetAddress = InetAddress.getByName(ip);
InetSocketAddress isa = new InetSocketAddress(inetAddress, Integer.parseInt(s[1]));
return isa;
} catch (Exception e) {
log.error("Failed to obtain address", e);
}
return null;
}
public static String socketAddress2String(final SocketAddress addr) {
StringBuilder sb = new StringBuilder();
InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
......
......@@ -49,7 +49,6 @@ public class RemotingCommand {
private static final int RPC_ONEWAY = 1;
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);
......
......@@ -29,6 +29,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
......@@ -54,7 +55,9 @@ public class SnodeStartup {
public static void main(String[] args) throws IOException, JoranException {
SnodeConfig snodeConfig = loadConfig(args);
if (snodeConfig.isEmbeddedModeEnable()) {
BrokerStartup.start(BrokerStartup.createBrokerController(args));
BrokerController brokerController = BrokerStartup.createBrokerController(args);
BrokerStartup.start(brokerController);
snodeConfig.setSnodeName(brokerController.getBrokerConfig().getBrokerName());
}
SnodeController snodeController = createSnodeController(snodeConfig);
startup(snodeController);
......
......@@ -75,19 +75,6 @@ public class ConsumerOffsetManager {
}
}
// private long parserOffset(final RemotingChannel remotingChannel, final String enodeName, final String group,
// final String topic, final int queueId) {
// try {
// RemotingCommand remotingCommand = queryOffset(remotingChannel, enodeName, group, topic, queueId);
// QueryConsumerOffsetResponseHeader responseHeader =
// (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
// return responseHeader.getOffset();
// } catch (Exception ex) {
// log.error("Load offset from broker error", ex);
// }
// return -1;
// }
public long queryCacheOffset(final String enodeName, final String group, final String topic, final int queueId) {
String key = buildKey(enodeName, topic, group);
ConcurrentMap<Integer, CacheOffset> map = this.offsetTable.get(key);
......
......@@ -28,6 +28,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
......@@ -74,8 +75,9 @@ public class SendMessageProcessor implements RequestProcessor {
request.getCode() == RequestCode.SEND_BATCH_MESSAGE) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN();
sendMessageRequestHeaderV2.setP(remotingChannel.localAddress());
stringBuffer.append(sendMessageRequestHeaderV2.getB());
request.getExtFields().putIfAbsent("o", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
request.getExtFields().putIfAbsent("p", RemotingHelper.parseChannelRemoteAddr(remotingChannel.localAddress()));
} else {
isSendBack = true;
consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
......@@ -85,7 +87,6 @@ public class SendMessageProcessor implements RequestProcessor {
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(remotingChannel, enodeName, request);
sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
final byte[] message = request.getBody();
final boolean needPush = !isSendBack;
final SendMessageRequestHeader sendMessageRequestHeader =
......
......@@ -87,10 +87,11 @@ public class PushServiceImpl implements PushService {
messageExt.setReconsumeTimes(sendMessageRequestHeader.getReconsumeTimes());
messageExt.setCommitLogOffset(sendMessageResponseHeader.getCommitLogOffset());
messageExt.setBornTimestamp(sendMessageRequestHeader.getBornTimestamp());
messageExt.setBornHost(sendMessageRequestHeader.getBornHost());
messageExt.setStoreHost(RemotingUtil.string2SocketAddressWithIp(sendMessageResponseHeader.getStoreHost()));
messageExt.setBornHost(RemotingUtil.string2SocketAddress(sendMessageRequestHeader.getBornHost()));
messageExt.setStoreHost(RemotingUtil.string2SocketAddress(sendMessageResponseHeader.getStoreHost()));
messageExt.setStoreTimestamp(sendMessageResponseHeader.getStoreTimestamp());
messageExt.setWaitStoreMsgOK(false);
messageExt.setStoreSize(sendMessageResponseHeader.getStoreSize());
messageExt.setSysFlag(sendMessageRequestHeader.getSysFlag());
messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message);
......
......@@ -26,14 +26,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册