未验证 提交 ad9d6162 编写于 作者: K kezhenxu94 提交者: GitHub

Add `socketTimeout` back to the new implementation (#7798)

上级 24aa9b1b
......@@ -117,6 +117,7 @@ public class ElasticSearchClient implements Client, HealthCheckable {
.endpoints(clusterNodes.split(","))
.protocol(protocol)
.connectTimeout(connectTimeout)
.socketTimeout(socketTimeout)
.numHttpClientThread(numHttpClientThread)
.healthyListener(healthy -> {
if (healthy) {
......
......@@ -63,6 +63,8 @@ public final class ElasticSearchBuilder {
private Duration connectTimeout = Duration.ofMillis(500);
private Duration socketTimeout = Duration.ofSeconds(30);
private Consumer<Boolean> healthyListener;
private int numHttpClientThread;
......@@ -117,6 +119,12 @@ public final class ElasticSearchBuilder {
return this;
}
public ElasticSearchBuilder socketTimeout(int socketTimeout) {
checkArgument(socketTimeout > 0, "socketTimeout must be positive");
this.socketTimeout = Duration.ofMillis(socketTimeout);
return this;
}
public ElasticSearchBuilder healthyListener(Consumer<Boolean> healthyListener) {
requireNonNull(healthyListener, "healthyListener");
this.healthyListener = healthyListener;
......@@ -138,6 +146,7 @@ public final class ElasticSearchBuilder {
final ClientFactoryBuilder factoryBuilder =
ClientFactory.builder()
.connectTimeout(connectTimeout)
.idleTimeout(socketTimeout)
.useHttp2Preface(false)
.workerGroup(numHttpClientThread > 0 ? numHttpClientThread : NUM_PROC);
......
......@@ -84,9 +84,10 @@ public final class BulkProcessor {
return this;
}
@SneakyThrows
private void internalAdd(Object request) {
requireNonNull(request, "request");
requests.add(request);
requests.put(request);
flushIfNeeded();
}
......@@ -120,6 +121,10 @@ public final class BulkProcessor {
private CompletableFuture<Void> doFlush(final List<Object> batch) {
log.debug("Executing bulk with {} requests", batch.size());
if (batch.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
try {
final RequestFactory rf = v.requestFactory();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册