未验证 提交 e1e2333c 编写于 作者: R rongtong 提交者: GitHub

Merge pull request #2024 from rushsky518/mqclient

[ISSUE #2025] Refactor  MQClientAPIImpl#processSendResponse
...@@ -643,72 +643,64 @@ public class MQClientAPIImpl { ...@@ -643,72 +643,64 @@ public class MQClientAPIImpl {
final Message msg, final Message msg,
final RemotingCommand response final RemotingCommand response
) throws MQBrokerException, RemotingCommandException { ) throws MQBrokerException, RemotingCommandException {
SendStatus sendStatus;
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT: case ResponseCode.FLUSH_DISK_TIMEOUT: {
case ResponseCode.FLUSH_SLAVE_TIMEOUT: sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
break;
}
case ResponseCode.FLUSH_SLAVE_TIMEOUT: {
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
break;
}
case ResponseCode.SLAVE_NOT_AVAILABLE: { case ResponseCode.SLAVE_NOT_AVAILABLE: {
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
break;
} }
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
SendStatus sendStatus = SendStatus.SEND_OK; sendStatus = SendStatus.SEND_OK;
switch (response.getCode()) { break;
case ResponseCode.FLUSH_DISK_TIMEOUT: }
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT; default: {
break; throw new MQBrokerException(response.getCode(), response.getRemark());
case ResponseCode.FLUSH_SLAVE_TIMEOUT: }
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT; }
break;
case ResponseCode.SLAVE_NOT_AVAILABLE:
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
break;
case ResponseCode.SUCCESS:
sendStatus = SendStatus.SEND_OK;
break;
default:
assert false;
break;
}
SendMessageResponseHeader responseHeader = SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace. //If namespace not null , reset Topic without namespace.
String topic = msg.getTopic(); String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) { if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace()); topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
} }
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId()); MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
String uniqMsgId = MessageClientIDSetter.getUniqID(msg); String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) { if (msg instanceof MessageBatch) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (Message message : (MessageBatch) msg) { for (Message message : (MessageBatch) msg) {
sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message)); sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
} }
uniqMsgId = sb.toString(); uniqMsgId = sb.toString();
} }
SendResult sendResult = new SendResult(sendStatus, SendResult sendResult = new SendResult(sendStatus,
uniqMsgId, uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId()); sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
if (regionId == null || regionId.isEmpty()) { if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID; regionId = MixAll.DEFAULT_TRACE_REGION_ID;
} }
if (traceOn != null && traceOn.equals("false")) { if (traceOn != null && traceOn.equals("false")) {
sendResult.setTraceOn(false); sendResult.setTraceOn(false);
} else { } else {
sendResult.setTraceOn(true); sendResult.setTraceOn(true);
}
sendResult.setRegionId(regionId);
return sendResult;
}
default:
break;
} }
sendResult.setRegionId(regionId);
throw new MQBrokerException(response.getCode(), response.getRemark()); return sendResult;
} }
public PullResult pullMessage( public PullResult pullMessage(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册