提交 6c640286 编写于 作者: D dongeforever

Polish the json problem

上级 fa398156
......@@ -313,7 +313,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
final TopicQueueMappingBody topicQueueMappingBody = RemotingSerializable.decode(request.getBody(), TopicQueueMappingBody.class);
final TopicQueueMappingDetail topicQueueMappingDetail = RemotingSerializable.decode(request.getBody(), TopicQueueMappingDetail.class);
String topic = requestHeader.getTopic();
......@@ -338,7 +338,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingBody.getMappingDetail(), force);
System.out.println("Broker body:" + new String(request.getBody()));
System.out.println("Broker bodetaildy:" + topicQueueMappingDetail.toJson());
this.brokerController.getTopicQueueMappingManager().updateTopicQueueMapping(topicQueueMappingDetail, force);
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
response.setCode(ResponseCode.SUCCESS);
......
......@@ -80,23 +80,6 @@ public class MQAdminImpl {
this.timeoutMillis = timeoutMillis;
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, boolean force) throws MQClientException {
MQClientException exception = null;
for (int i = 0; i < 3; i++) {
try {
this.mQClientFactory.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
break;
} catch (Exception e) {
if (2 == i) {
exception = new MQClientException("create topic to broker exception", e);
}
}
}
if (exception != null) {
throw exception;
}
}
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
......
......@@ -2708,7 +2708,7 @@ public class MQClientAPIImpl {
public TopicConfigAndQueueMapping getTopicConfig(final String brokerAddr, String topic,
long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setWithMapping(true);
......@@ -2728,11 +2728,11 @@ public class MQClientAPIImpl {
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail topicQueueMappingDetail, boolean force,
final long timeoutMillis) throws RemotingException, InterruptedException, MQClientException {
final long timeoutMillis) throws RemotingException, InterruptedException, MQBrokerException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
......@@ -2757,6 +2757,6 @@ public class MQClientAPIImpl {
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicQueueMappingBody extends RemotingSerializable {
private boolean force;
private int prevGen;
private TopicQueueMappingDetail mappingDetail;
public int getPrevGen() {
return prevGen;
}
public void setPrevGen(int prevGen) {
this.prevGen = prevGen;
}
public TopicQueueMappingDetail getMappingDetail() {
return mappingDetail;
}
public void setMappingDetail(TopicQueueMappingDetail mappingDetail) {
this.mappingDetail = mappingDetail;
}
public boolean isForce() {
return force;
}
public void setForce(boolean force) {
this.force = force;
}
}
......@@ -100,6 +100,18 @@ public class LogicQueueMappingItem {
this.logicOffset = logicOffset;
}
public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}
public void setTimeOfStart(long timeOfStart) {
this.timeOfStart = timeOfStart;
}
public void setTimeOfEnd(long timeOfEnd) {
this.timeOfEnd = timeOfEnd;
}
@Override
public String toString() {
return "LogicQueueMappingItem{" +
......
......@@ -29,6 +29,13 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail() {
}
public TopicQueueMappingDetail(String topic, int totalQueues, String bname, long epoch) {
super(topic, totalQueues, bname, epoch);
buildIdMap();
......@@ -120,6 +127,10 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return hostedQueues;
}
public void setHostedQueues(ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> hostedQueues) {
this.hostedQueues = hostedQueues;
}
public boolean checkIfAsPhysical(Integer globalId) {
List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
return mappingItems == null
......
......@@ -32,6 +32,10 @@ public class TopicQueueMappingInfo extends RemotingSerializable {
//register to broker to construct the route
transient ConcurrentMap<Integer/*logicId*/, Integer/*physicalId*/> currIdMap = new ConcurrentHashMap<Integer, Integer>();
public TopicQueueMappingInfo() {
}
public TopicQueueMappingInfo(String topic, int totalQueues, String bname, long epoch) {
this.topic = topic;
this.totalQueues = totalQueues;
......
......@@ -4,8 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Test;
......@@ -20,13 +19,11 @@ public class TopicQueueMappingTest {
System.out.println(File.separator);
}
@Test
public void testJsonSerialize() {
LogicQueueMappingItem mappingItem = new LogicQueueMappingItem(1, 2, "broker01", 33333333333333333L, 44444444444444444L, 555555555555555555L, 6666666666666666L, 77777777777777777L);
String mappingItemJson = JSON.toJSONString(mappingItem) ;
System.out.println(mappingItemJson);
{
Map<String, Object> mappingItemMap = JSON.parseObject(mappingItemJson, Map.class);
Assert.assertEquals(8, mappingItemMap.size());
Assert.assertEquals(mappingItemMap.get("bname"), mappingItem.getBname());
......@@ -37,16 +34,30 @@ public class TopicQueueMappingTest {
Assert.assertEquals(mappingItemMap.get("endOffset"), mappingItem.getEndOffset());
Assert.assertEquals(mappingItemMap.get("timeOfStart"), mappingItem.getTimeOfStart());
Assert.assertEquals(mappingItemMap.get("timeOfEnd"), mappingItem.getTimeOfEnd());
}
{
String mappingItemJson2 = RemotingSerializable.toJson(RemotingSerializable.decode(mappingItemJson.getBytes(), LogicQueueMappingItem.class), false);
Assert.assertEquals(mappingItemJson, mappingItemJson2);
}
TopicQueueMappingDetail mappingDetail = new TopicQueueMappingDetail("test", 1, "broker01", System.currentTimeMillis());
mappingDetail.putMappingInfo(0, ImmutableList.of(mappingItem));
String mappingDetailJson = JSON.toJSONString(mappingDetail);
{
Map mappingDetailMap = JSON.parseObject(mappingDetailJson);
Assert.assertFalse(mappingDetailMap.containsKey("prevIdMap"));
Assert.assertFalse(mappingDetailMap.containsKey("currIdMap"));
Assert.assertEquals(4, mappingDetailMap.size());
Assert.assertEquals(6, mappingDetailMap.size());
Assert.assertEquals(1, ((JSONObject) mappingDetailMap.get("hostedQueues")).size());
Assert.assertEquals(1, ((JSONArray)((JSONObject) mappingDetailMap.get("hostedQueues")).get("0")).size());
}
{
System.out.println(mappingDetailJson);
TopicQueueMappingDetail detailFromJson = RemotingSerializable.decode(mappingDetailJson.getBytes(), TopicQueueMappingDetail.class);
System.out.println(JSON.toJSONString(detailFromJson));
//Assert.assertEquals(1, detailFromJson.getHostedQueues().size());
//Assert.assertEquals(1, detailFromJson.getHostedQueues().get("0").size());
}
}
}
......@@ -57,6 +57,7 @@ public class StaticTopicIT extends BaseConf {
String broker = entry.getKey();
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = entry.getValue();
System.out.println(configMapping.getMappingDetail().toJson());
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), false);
}
}
......
......@@ -218,12 +218,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
@Override
public TopicConfig examineTopicConfig(String addr,
String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
String topic) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
}
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.examineTopicConfigAll(clientMetadata, topic);
}
......@@ -668,7 +668,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
public void createStaticTopic(String addr, String defaultTopic, TopicConfig topicConfig, TopicQueueMappingDetail mappingDetail, boolean force) throws RemotingException, InterruptedException, MQBrokerException {
this.defaultMQAdminExtImpl.createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
}
......
......@@ -211,7 +211,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, InterruptedException, MQClientException {
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
......@@ -257,7 +258,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
public TopicConfig examineTopicConfig(String addr, String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicConfig(addr, topic, timeoutMillis);
}
......@@ -1105,8 +1106,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws MQClientException {
this.mqClientInstance.getMQAdminImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force);
public void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().createStaticTopic(addr, defaultTopic, topicConfig, mappingDetail, force, timeoutMillis);
}
......@@ -1170,13 +1171,23 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException {
public Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = new HashMap<>();
boolean getFromBrokers = false;
TopicRouteData routeData = null;
try {
TopicRouteData routeData = examineTopicRouteInfo(topic);
clientMetadata.freshTopicRoute(topic, routeData);
routeData = examineTopicRouteInfo(topic);
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw new MQBrokerException(exception.getResponseCode(), exception.getErrorMessage());
} else {
getFromBrokers = true;
}
}
if (!getFromBrokers) {
if (routeData != null
&& !routeData.getQueueDatas().isEmpty()) {
clientMetadata.freshTopicRoute(topic, routeData);
for (QueueData queueData: routeData.getQueueDatas()) {
String bname = queueData.getBrokerName();
String addr = clientMetadata.findMasterBrokerAddr(bname);
......@@ -1186,30 +1197,22 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQClientException exception) {
} catch (MQBrokerException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
}
}
}
} catch (MQClientException exception) {
if (exception.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception;
}
} else {
log.info("The topic {} dose not exist in nameserver, so check it from all brokers", topic);
//if cannot get from nameserver, then check all the brokers
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (clusterInfo != null
&& clusterInfo.getBrokerAddrTable() != null) {
clientMetadata.refreshClusterInfo(clusterInfo);
}
}catch (MQBrokerException e) {
throw new MQClientException(e.getResponseCode(), e.getMessage());
}
for (Entry<String, HashMap<Long, String>> entry : clientMetadata.getBrokerAddrTable().entrySet()) {
String bname = entry.getKey();
HashMap<Long, String> map = entry.getValue();
......@@ -1221,12 +1224,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
if (mapping != null) {
brokerConfigMap.put(bname, mapping);
}
} catch (MQClientException clientException) {
if (clientException.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw clientException;
} catch (MQBrokerException exception1) {
if (exception1.getResponseCode() != ResponseCode.TOPIC_NOT_EXIST) {
throw exception1;
}
}
}
}
}
......
......@@ -108,8 +108,6 @@ public interface MQAdminExt extends MQAdmin {
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
TopicConfig examineTopicConfig(final String addr,
final String topic) throws InterruptedException, MQClientException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
......@@ -344,9 +342,12 @@ public interface MQAdminExt extends MQAdmin {
LogicalQueueRouteData toQueueRouteData) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQClientException;
TopicConfig examineTopicConfig(final String addr,
final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void createStaticTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final TopicQueueMappingDetail mappingDetail, final boolean force) throws RemotingException, InterruptedException, MQBrokerException;
Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQClientException;
Map<String, TopicConfigAndQueueMapping> examineTopicConfigAll(ClientMetadata clientMetadata, String topic) throws RemotingException, InterruptedException, MQBrokerException;
void remappingStaticTopic(ClientMetadata clientMetadata, String topic, Set<String> brokersToMapIn, Set<String> brokersToMapOut, Map<String, TopicConfigAndQueueMapping> brokerConfigMap, int blockSeqSize, boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册