提交 198e154e 编写于 作者: Y Yuto Furuta 提交者: Jia Zhai

add timeout to internal rest api (#4762)

(cherry picked from commit 9605aede)
上级 d2a4ca4f
......@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
......@@ -52,12 +54,14 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
@Override
public void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
try {
createPartitionedTopicAsync(topic, numPartitions).get();
createPartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -72,12 +76,14 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
@Override
public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) throws PulsarAdminException {
try {
return getPartitionedTopicMetadataAsync(topic).get();
return getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -105,12 +111,14 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
@Override
public NonPersistentTopicStats getStats(String topic) throws PulsarAdminException {
try {
return getStatsAsync(topic).get();
return getStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -138,12 +146,14 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
@Override
public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException {
try {
return getInternalStatsAsync(topic).get();
return getInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -171,12 +181,14 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
@Override
public void unload(String topic) throws PulsarAdminException {
try {
unloadAsync(topic).get();
unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -190,12 +202,14 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
@Override
public List<String> getListInBundle(String namespace, String bundleRange) throws PulsarAdminException {
try {
return getListInBundleAsync(namespace, bundleRange).get();
return getListInBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -221,12 +235,14 @@ public class NonPersistentTopicsImpl extends BaseResource implements NonPersiste
@Override
public List<String> getList(String namespace) throws PulsarAdminException {
try {
return getListAsync(namespace).get();
return getListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......
......@@ -131,12 +131,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public List<String> getListInBundle(String namespace, String bundleRange) throws PulsarAdminException {
try {
return getListInBundleAsync(namespace, bundleRange).get();
return getListInBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -197,24 +199,28 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
try {
createPartitionedTopicAsync(topic, numPartitions).get();
createPartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public void createNonPartitionedTopic(String topic) throws PulsarAdminException {
try {
createNonPartitionedTopicAsync(topic).get();
createNonPartitionedTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -236,12 +242,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void updatePartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
try {
updatePartitionedTopicAsync(topic, numPartitions).get();
updatePartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -256,12 +264,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) throws PulsarAdminException {
try {
return getPartitionedTopicMetadataAsync(topic).get();
return getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -294,12 +304,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void deletePartitionedTopic(String topic, boolean force) throws PulsarAdminException {
try {
deletePartitionedTopicAsync(topic, force).get();
deletePartitionedTopicAsync(topic, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -319,12 +331,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void delete(String topic, boolean force) throws PulsarAdminException {
try {
deleteAsync(topic, force).get();
deleteAsync(topic, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -339,12 +353,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void unload(String topic) throws PulsarAdminException {
try {
unloadAsync(topic).get();
unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -358,12 +374,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public List<String> getSubscriptions(String topic) throws PulsarAdminException {
try {
return getSubscriptionsAsync(topic).get();
return getSubscriptionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......@@ -595,12 +613,14 @@ public class TopicsImpl extends BaseResource implements Topics {
@Override
public void skipAllMessages(String topic, String subName) throws PulsarAdminException {
try {
skipAllMessagesAsync(topic, subName).get();
skipAllMessagesAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册