提交 977b1a85 编写于 作者: M massakam 提交者: Sijie Guo

Add perPartition parameter to partitioned-stats API (#4639)

### Motivation

Currently, the partitioned-stats API response includes stats for each partition. However, if the number of partitions and clients is large, the size of the response will be very large. In such cases, it is useful to have a query parameter to get a response that does not include stats for each partition.

```sh
$ curl -s http://localhost:8080/admin/persistent/sample/standalone/ns1/pt1/partitioned-stats | jq .

{
  "msgRateIn": 0,
  "msgThroughputIn": 0,
  "msgRateOut": 0,
  "msgThroughputOut": 0,
  "averageMsgSize": 0,
  "storageSize": 0,
  "publishers": [],
  "subscriptions": {
    "sub1": {
      "msgRateOut": 0,
      "msgThroughputOut": 0,
      "msgRateRedeliver": 0,
      "msgBacklog": 0,
      "blockedSubscriptionOnUnackedMsgs": false,
      "msgDelayed": 0,
      "unackedMessages": 0,
      "msgRateExpired": 0,
      "consumers": [],
      "isReplicated": false
    }
  },
  "replication": {},
  "metadata": {
    "partitions": 2
  },
  "partitions": {
    "persistent://sample/standalone/ns1/pt1-partition-1": {
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "msgRateOut": 0,
      "msgThroughputOut": 0,
      "averageMsgSize": 0,
      "storageSize": 0,
      "publishers": [],
      "subscriptions": {
        "sub1": {
          "msgRateOut": 0,
          "msgThroughputOut": 0,
          "msgRateRedeliver": 0,
          "msgBacklog": 0,
          "blockedSubscriptionOnUnackedMsgs": false,
          "msgDelayed": 0,
          "unackedMessages": 0,
          "msgRateExpired": 0,
          "consumers": [],
          "isReplicated": false
        }
      },
      "replication": {},
      "deduplicationStatus": "Disabled"
    },
    "persistent://sample/standalone/ns1/pt1-partition-0": {
      "msgRateIn": 0,
      "msgThroughputIn": 0,
      "msgRateOut": 0,
      "msgThroughputOut": 0,
      "averageMsgSize": 0,
      "storageSize": 0,
      "publishers": [],
      "subscriptions": {
        "sub1": {
          "msgRateOut": 0,
          "msgThroughputOut": 0,
          "msgRateRedeliver": 0,
          "msgBacklog": 0,
          "blockedSubscriptionOnUnackedMsgs": false,
          "msgDelayed": 0,
          "unackedMessages": 0,
          "msgRateExpired": 0,
          "consumers": [],
          "isReplicated": false
        }
      },
      "replication": {},
      "deduplicationStatus": "Disabled"
    }
  }
}
```

### Modifications

Added query parameter named `perPartition` to the partitioned-stats API. The default value is true.
上级 b306552f
......@@ -641,7 +641,8 @@ public class PersistentTopicsBase extends AdminResource {
}, null);
}
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative) {
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
boolean perPartition) {
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
......@@ -669,14 +670,16 @@ public class PersistentTopicsBase extends AdminResource {
if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
try {
stats.add(statFuture.get());
stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
if (perPartition) {
stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return null;
}
}
}
if (stats.partitions.isEmpty()) {
if (perPartition && stats.partitions.isEmpty()) {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
try {
boolean zkPathExists = zkPathExists(path);
......
......@@ -277,10 +277,11 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
......
......@@ -470,11 +470,13 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Get per partition stats")
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
......
......@@ -513,6 +513,7 @@ public class TopicsImpl extends BaseResource implements Topics {
boolean perPartition) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-stats");
path = path.queryParam("perPartition", perPartition);
final CompletableFuture<PartitionedTopicStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PartitionedTopicStats>() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册