DefaultMQAdminExt.java 25.5 KB
Newer Older
1
/*
2 3 4 5 6 7 8
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10 11 12 13 14 15 16
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
17 18
package org.apache.rocketmq.tools.admin;

Y
yukon 已提交
19 20 21 22 23
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
24 25 26 27
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
28
import org.apache.rocketmq.common.AclConfig;
29
import org.apache.rocketmq.common.PlainAccessConfig;
30 31 32 33 34 35
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
Y
yukon 已提交
36
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
37
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
Y
yukon 已提交
38 39 40 41 42 43 44 45
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
46
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
Y
yukon 已提交
47 48 49 50
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
51 52
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
53
import org.apache.rocketmq.common.topic.TopicValidator;
54
import org.apache.rocketmq.remoting.RPCHook;
Y
yukon 已提交
55 56 57 58 59
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
60
import org.apache.rocketmq.tools.admin.api.MessageTrack;
61 62 63 64

public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
    private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
    private String adminExtGroup = "admin_ext_group";
65
    private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
    private long timeoutMillis = 5000;

    public DefaultMQAdminExt() {
        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis);
    }

    public DefaultMQAdminExt(long timeoutMillis) {
        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis);
    }

    public DefaultMQAdminExt(RPCHook rpcHook) {
        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
    }

    public DefaultMQAdminExt(RPCHook rpcHook, long timeoutMillis) {
        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
    }

    public DefaultMQAdminExt(final String adminExtGroup) {
        this.adminExtGroup = adminExtGroup;
        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis);
    }

    public DefaultMQAdminExt(final String adminExtGroup, long timeoutMillis) {
        this.adminExtGroup = adminExtGroup;
        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis);
    }

    @Override
    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
        createTopic(key, newTopic, queueNum, 0);
    }

    @Override
    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
        defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
    }

    @Override
    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
        return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
    }

    @Override
    public long maxOffset(MessageQueue mq) throws MQClientException {
        return defaultMQAdminExtImpl.maxOffset(mq);
    }

    @Override
    public long minOffset(MessageQueue mq) throws MQClientException {
        return defaultMQAdminExtImpl.minOffset(mq);
    }

    @Override
    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
        return defaultMQAdminExtImpl.earliestMsgStoreTime(mq);
    }

    @Override
Y
yukon 已提交
125 126
    public MessageExt viewMessage(
        String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
127 128 129 130
        return defaultMQAdminExtImpl.viewMessage(offsetMsgId);
    }

    @Override
131 132 133
    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
        throws MQClientException, InterruptedException {

134 135 136
        return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
    }

137
    public QueryResult queryMessageByUniqKey(String topic, String key, int maxNum, long begin, long end)
138 139 140 141 142
        throws MQClientException, InterruptedException {

        return defaultMQAdminExtImpl.queryMessageByUniqKey(topic, key, maxNum, begin, end);
    }

143 144 145 146 147 148 149 150 151 152 153
    @Override
    public void start() throws MQClientException {
        defaultMQAdminExtImpl.start();
    }

    @Override
    public void shutdown() {
        defaultMQAdminExtImpl.shutdown();
    }

    @Override
Y
yukon 已提交
154 155
    public void updateBrokerConfig(String brokerAddr,
        Properties properties) throws RemotingConnectException, RemotingSendRequestException,
Y
yukon 已提交
156
        RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
157 158 159 160 161
        defaultMQAdminExtImpl.updateBrokerConfig(brokerAddr, properties);
    }

    @Override
    public Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
Y
yukon 已提交
162
        RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
163 164 165 166 167
        return defaultMQAdminExtImpl.getBrokerConfig(brokerAddr);
    }

    @Override
    public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
Y
yukon 已提交
168
        InterruptedException, MQClientException {
169 170 171
        defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config);
    }

172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
    @Override
    public void createAndUpdatePlainAccessConfig(String addr,
        PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        defaultMQAdminExtImpl.createAndUpdatePlainAccessConfig(addr, config);
    }

    @Override public void deletePlainAccessConfig(String addr,
        String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        defaultMQAdminExtImpl.deletePlainAccessConfig(addr, accessKey);
    }

    @Override public void updateGlobalWhiteAddrConfig(String addr,
        String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        defaultMQAdminExtImpl.updateGlobalWhiteAddrConfig(addr, globalWhiteAddrs);
    }

    @Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
        String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr);
    }

193 194 195 196 197
    @Override public AclConfig examineBrokerClusterAclConfig(
        String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return defaultMQAdminExtImpl.examineBrokerClusterAclConfig(addr);
    }

198
    @Override
Y
yukon 已提交
199 200
    public void createAndUpdateSubscriptionGroupConfig(String addr,
        SubscriptionGroupConfig config) throws RemotingException,
Y
yukon 已提交
201
        MQBrokerException, InterruptedException, MQClientException {
202 203 204 205
        defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfig(addr, config);
    }

    @Override
206 207
    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group)
        throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
208 209 210 211
        return defaultMQAdminExtImpl.examineSubscriptionGroupConfig(addr, group);
    }

    @Override
212
    public TopicConfig examineTopicConfig(String addr, String topic) throws RemotingException, InterruptedException, MQBrokerException {
213 214 215 216
        return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
    }

    @Override
Y
yukon 已提交
217 218
    public TopicStatsTable examineTopicStats(
        String topic) throws RemotingException, MQClientException, InterruptedException,
Y
yukon 已提交
219
        MQBrokerException {
220 221 222 223 224 225 226 227 228
        return defaultMQAdminExtImpl.examineTopicStats(topic);
    }

    @Override
    public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
        return this.defaultMQAdminExtImpl.fetchAllTopicList();
    }

    @Override
Y
yukon 已提交
229 230
    public TopicList fetchTopicsByCLuster(
        String clusterName) throws RemotingException, MQClientException, InterruptedException {
231 232 233 234
        return this.defaultMQAdminExtImpl.fetchTopicsByCLuster(clusterName);
    }

    @Override
Y
yukon 已提交
235 236
    public KVTable fetchBrokerRuntimeStats(
        final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
Y
yukon 已提交
237
        RemotingTimeoutException, InterruptedException, MQBrokerException {
238 239 240 241
        return this.defaultMQAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr);
    }

    @Override
Y
yukon 已提交
242 243
    public ConsumeStats examineConsumeStats(
        String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
Y
yukon 已提交
244
        MQBrokerException {
245 246 247 248
        return examineConsumeStats(consumerGroup, null);
    }

    @Override
Y
yukon 已提交
249 250
    public ConsumeStats examineConsumeStats(String consumerGroup,
        String topic) throws RemotingException, MQClientException,
Y
yukon 已提交
251
        InterruptedException, MQBrokerException {
252 253 254 255 256
        return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
    }

    @Override
    public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
Y
yukon 已提交
257
        RemotingSendRequestException, MQBrokerException {
258 259 260 261
        return defaultMQAdminExtImpl.examineBrokerClusterInfo();
    }

    @Override
Y
yukon 已提交
262 263
    public TopicRouteData examineTopicRouteInfo(
        String topic) throws RemotingException, MQClientException, InterruptedException {
264 265 266 267
        return defaultMQAdminExtImpl.examineTopicRouteInfo(topic);
    }

    @Override
Y
yukon 已提交
268 269
    public ConsumerConnection examineConsumerConnectionInfo(
        String consumerGroup) throws InterruptedException, MQBrokerException,
Y
yukon 已提交
270
        RemotingException, MQClientException {
271 272 273 274
        return defaultMQAdminExtImpl.examineConsumerConnectionInfo(consumerGroup);
    }

    @Override
Y
yukon 已提交
275 276
    public ProducerConnection examineProducerConnectionInfo(String producerGroup,
        final String topic) throws RemotingException,
Y
yukon 已提交
277
        MQClientException, InterruptedException, MQBrokerException {
278 279 280 281 282 283 284 285 286 287
        return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
    }

    @Override
    public List<String> getNameServerAddressList() {
        return this.defaultMQAdminExtImpl.getNameServerAddressList();
    }

    @Override
    public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
Y
yukon 已提交
288
        RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
289 290 291
        return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName);
    }

C
coder-zzzz 已提交
292 293 294
    @Override
    public int addWritePermOfBroker(String namesrvAddr, String brokerName) throws RemotingCommandException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
        return defaultMQAdminExtImpl.addWritePermOfBroker(namesrvAddr, brokerName);
295 296 297 298 299 300 301 302
    }

    @Override
    public void putKVConfig(String namespace, String key, String value) {
        defaultMQAdminExtImpl.putKVConfig(namespace, key, value);
    }

    @Override
Y
yukon 已提交
303 304
    public String getKVConfig(String namespace,
        String key) throws RemotingException, MQClientException, InterruptedException {
305 306 307 308
        return defaultMQAdminExtImpl.getKVConfig(namespace, key);
    }

    @Override
Y
yukon 已提交
309 310
    public KVTable getKVListByNamespace(
        String namespace) throws RemotingException, MQClientException, InterruptedException {
311 312 313 314
        return defaultMQAdminExtImpl.getKVListByNamespace(namespace);
    }

    @Override
Y
yukon 已提交
315 316
    public void deleteTopicInBroker(Set<String> addrs,
        String topic) throws RemotingException, MQBrokerException, InterruptedException,
Y
yukon 已提交
317
        MQClientException {
318 319 320 321
        defaultMQAdminExtImpl.deleteTopicInBroker(addrs, topic);
    }

    @Override
Y
yukon 已提交
322 323
    public void deleteTopicInNameServer(Set<String> addrs,
        String topic) throws RemotingException, MQBrokerException, InterruptedException,
Y
yukon 已提交
324
        MQClientException {
325 326 327 328
        defaultMQAdminExtImpl.deleteTopicInNameServer(addrs, topic);
    }

    @Override
Y
yukon 已提交
329 330
    public void deleteSubscriptionGroup(String addr,
        String groupName) throws RemotingException, MQBrokerException, InterruptedException,
Y
yukon 已提交
331
        MQClientException {
332 333 334
        defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName);
    }

335 336 337 338 339 340 341
    @Override
    public void deleteSubscriptionGroup(String addr,
        String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException,
        MQClientException {
        defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName, removeOffset);
    }

342
    @Override
Y
yukon 已提交
343 344
    public void createAndUpdateKvConfig(String namespace, String key,
        String value) throws RemotingException, MQBrokerException,
Y
yukon 已提交
345
        InterruptedException, MQClientException {
346 347 348 349
        defaultMQAdminExtImpl.createAndUpdateKvConfig(namespace, key, value);
    }

    @Override
Y
yukon 已提交
350 351
    public void deleteKvConfig(String namespace,
        String key) throws RemotingException, MQBrokerException, InterruptedException,
Y
yukon 已提交
352
        MQClientException {
353 354 355
        defaultMQAdminExtImpl.deleteKvConfig(namespace, key);
    }

356
    @Override
Y
yukon 已提交
357 358
    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
        boolean force)
Y
yukon 已提交
359
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
360 361 362 363 364
        return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
    }

    @Override
    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
Y
yukon 已提交
365
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
366 367 368
        return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
    }

Y
yukon 已提交
369 370
    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
        boolean isC)
Y
yukon 已提交
371
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
372 373 374 375
        return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC);
    }

    @Override
Y
yukon 已提交
376 377
    public void resetOffsetNew(String consumerGroup, String topic,
        long timestamp) throws RemotingException, MQBrokerException,
Y
yukon 已提交
378
        InterruptedException, MQClientException {
379 380 381 382
        this.defaultMQAdminExtImpl.resetOffsetNew(consumerGroup, topic, timestamp);
    }

    @Override
Y
yukon 已提交
383 384
    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
        String clientAddr) throws RemotingException,
Y
yukon 已提交
385
        MQBrokerException, InterruptedException, MQClientException {
386 387 388 389
        return defaultMQAdminExtImpl.getConsumeStatus(topic, group, clientAddr);
    }

    @Override
Y
yukon 已提交
390 391
    public void createOrUpdateOrderConf(String key, String value,
        boolean isCluster) throws RemotingException, MQBrokerException,
Y
yukon 已提交
392
        InterruptedException, MQClientException {
393 394 395 396
        defaultMQAdminExtImpl.createOrUpdateOrderConf(key, value, isCluster);
    }

    @Override
Y
yukon 已提交
397 398
    public GroupList queryTopicConsumeByWho(
        String topic) throws InterruptedException, MQBrokerException, RemotingException,
Y
yukon 已提交
399
        MQClientException {
400 401 402 403
        return this.defaultMQAdminExtImpl.queryTopicConsumeByWho(topic);
    }

    @Override
Y
yukon 已提交
404 405
    public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
        final String group) throws InterruptedException, MQBrokerException,
Y
yukon 已提交
406
        RemotingException, MQClientException {
407 408 409 410
        return this.defaultMQAdminExtImpl.queryConsumeTimeSpan(topic, group);
    }

    @Override
Y
yukon 已提交
411 412
    public boolean cleanExpiredConsumerQueue(
        String cluster) throws RemotingConnectException, RemotingSendRequestException,
Y
yukon 已提交
413
        RemotingTimeoutException, MQClientException, InterruptedException {
414 415 416 417
        return defaultMQAdminExtImpl.cleanExpiredConsumerQueue(cluster);
    }

    @Override
Y
yukon 已提交
418 419
    public boolean cleanExpiredConsumerQueueByAddr(
        String addr) throws RemotingConnectException, RemotingSendRequestException,
Y
yukon 已提交
420
        RemotingTimeoutException, MQClientException, InterruptedException {
421 422 423 424 425
        return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr);
    }

    @Override
    public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
Y
yukon 已提交
426
        RemotingTimeoutException, MQClientException, InterruptedException {
427 428 429 430 431
        return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(cluster);
    }

    @Override
    public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
Y
yukon 已提交
432
        RemotingTimeoutException, MQClientException, InterruptedException {
433 434 435 436
        return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(addr);
    }

    @Override
Y
yukon 已提交
437 438
    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
        boolean jstack) throws RemotingException,
Y
yukon 已提交
439
        MQClientException, InterruptedException {
440 441 442 443 444
        return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack);
    }

    @Override
    public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
Y
yukon 已提交
445
        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
446 447 448 449
        return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId);
    }

    @Override
Y
yukon 已提交
450 451
    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId,
        final String topic,
Y
yukon 已提交
452
        final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
453 454 455 456
        return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
    }

    @Override
Y
yukon 已提交
457 458
    public List<MessageTrack> messageTrackDetail(
        MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
Y
yukon 已提交
459
        MQBrokerException {
460 461 462 463
        return this.defaultMQAdminExtImpl.messageTrackDetail(msg);
    }

    @Override
Y
yukon 已提交
464 465
    public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
        boolean isOffline) throws RemotingException,
Y
yukon 已提交
466
        MQClientException, InterruptedException, MQBrokerException {
467 468 469 470
        this.defaultMQAdminExtImpl.cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
    }

    @Override
Y
yukon 已提交
471 472
    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
        String statsKey) throws RemotingConnectException,
Y
yukon 已提交
473
        RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
474 475 476 477 478
        return this.defaultMQAdminExtImpl.viewBrokerStatsData(brokerAddr, statsName, statsKey);
    }

    @Override
    public Set<String> getClusterList(String topic) throws RemotingConnectException, RemotingSendRequestException,
Y
yukon 已提交
479
        RemotingTimeoutException, MQClientException, InterruptedException {
480 481 482 483
        return this.defaultMQAdminExtImpl.getClusterList(topic);
    }

    @Override
Y
yukon 已提交
484 485 486
    public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder,
        long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
        RemotingTimeoutException, MQClientException, InterruptedException {
487 488 489 490
        return this.defaultMQAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
    }

    @Override
Y
yukon 已提交
491 492
    public Set<String> getTopicClusterList(
        final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
493 494 495 496
        return this.defaultMQAdminExtImpl.getTopicClusterList(topic);
    }

    @Override
Y
yukon 已提交
497 498 499
    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
        RemotingConnectException, MQBrokerException {
500 501 502 503
        return this.defaultMQAdminExtImpl.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
    }

    @Override
504
    public SubscriptionGroupWrapper getUserSubscriptionGroup(final String brokerAddr,
Y
yukon 已提交
505 506
        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
        RemotingConnectException, MQBrokerException {
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
        return this.defaultMQAdminExtImpl.getUserSubscriptionGroup(brokerAddr, timeoutMillis);
    }

    @Override
    public TopicConfigSerializeWrapper getAllTopicConfig(final String brokerAddr,
        long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
        RemotingConnectException, MQBrokerException {
        return this.defaultMQAdminExtImpl.getAllTopicConfig(brokerAddr, timeoutMillis);
    }

    @Override
    public TopicConfigSerializeWrapper getUserTopicConfig(final String brokerAddr, final boolean specialTopic,
        long timeoutMillis) throws InterruptedException, RemotingException,
        MQBrokerException, MQClientException {
        return this.defaultMQAdminExtImpl.getUserTopicConfig(brokerAddr, specialTopic, timeoutMillis);
522 523 524
    }

    /* (non-Javadoc)
525
     * @see org.apache.rocketmq.client.MQAdmin#queryMessageByUniqKey(java.lang.String, java.lang.String)
526 527 528
     */
    @Override
    public MessageExt viewMessage(String topic, String msgId)
Y
yukon 已提交
529
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
        return this.defaultMQAdminExtImpl.viewMessage(topic, msgId);
    }

    public String getAdminExtGroup() {
        return adminExtGroup;
    }

    public void setAdminExtGroup(String adminExtGroup) {
        this.adminExtGroup = adminExtGroup;
    }

    public String getCreateTopicKey() {
        return createTopicKey;
    }

    public void setCreateTopicKey(String createTopicKey) {
        this.createTopicKey = createTopicKey;
    }

    @Override
Y
yukon 已提交
550 551
    public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
        long offset) throws RemotingException, InterruptedException, MQBrokerException {
552 553 554 555 556
        this.defaultMQAdminExtImpl.updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
    }

    @Override
    public void updateNameServerConfig(final Properties properties, final List<String> nameServers)
Y
yukon 已提交
557 558 559
        throws InterruptedException, RemotingConnectException,
        UnsupportedEncodingException, MQBrokerException, RemotingTimeoutException,
        MQClientException, RemotingSendRequestException {
560 561 562 563 564
        this.defaultMQAdminExtImpl.updateNameServerConfig(properties, nameServers);
    }

    @Override
    public Map<String, Properties> getNameServerConfig(final List<String> nameServers)
Y
yukon 已提交
565 566 567
        throws InterruptedException, RemotingTimeoutException,
        RemotingSendRequestException, RemotingConnectException, MQClientException,
        UnsupportedEncodingException {
568 569
        return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers);
    }
570 571

    @Override
Y
yukon 已提交
572 573
    public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index,
        int count, String consumerGroup)
574 575 576 577 578
        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
        return this.defaultMQAdminExtImpl.queryConsumeQueue(
            brokerAddr, topic, queueId, index, count, consumerGroup
        );
    }
579 580 581 582 583 584 585 586 587 588 589 590 591

    @Override
    public boolean resumeCheckHalfMessage(String msgId)
            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(msgId);
    }

    @Override
    public boolean resumeCheckHalfMessage(String topic,
            String msgId)
            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        return this.defaultMQAdminExtImpl.resumeCheckHalfMessage(topic, msgId);
    }
592
}