未验证 提交 c5cbd3a1 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Optimize `flushInterval` of ElasticSearch BulkProcessor (#10135)

Optimize `flushInterval` of ElasticSearch BulkProcessor to avoid extra periodical flush in the continuous bulk streams.
上级 3a3851f0
......@@ -33,6 +33,7 @@
* Remove the dependency of `refresh_interval` of ElasticSearch indices from `elasticsearch/flushInterval` config. Now,
it uses `core/persistentPeriod` + 5s as `refresh_interval` for all indices instead.
* Change `elasticsearch/flushInterval` to 5s(was 15s).
* Optimize `flushInterval` of ElasticSearch BulkProcessor to avoid extra periodical flush in the continuous bulk streams.
#### UI
......
......@@ -48,6 +48,8 @@ public final class BulkProcessor {
private final AtomicReference<ElasticSearch> es;
private final int bulkActions;
private final Semaphore semaphore;
private final long flushInternalInMillis;
private volatile long lastFlushTS = 0;
public static BulkProcessorBuilder builder() {
return new BulkProcessorBuilder();
......@@ -72,9 +74,12 @@ public final class BulkProcessor {
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
scheduler.setRemoveOnCancelPolicy(true);
flushInternalInMillis = flushInterval.getSeconds() * 1000;
scheduler.scheduleWithFixedDelay(
new RunnableWithExceptionProtection(this::flush,
t -> log.error("flush data to ES failure:", t)), 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
new RunnableWithExceptionProtection(
this::doPeriodicalFlush,
t -> log.error("flush data to ES failure:", t)
), 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
}
public CompletableFuture<Void> add(IndexRequest request) {
......@@ -101,6 +106,15 @@ public final class BulkProcessor {
}
}
private void doPeriodicalFlush() {
if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 2) {
// Run periodical flush if there is no `flushIfNeeded` executed in the second half of the flush period.
// Otherwise, wait for the next round. By default, the last 2 seconds of the 5s period.
// This could avoid periodical flush running among bulks(controlled by bulkActions).
flush();
}
}
public void flush() {
if (requests.isEmpty()) {
return;
......@@ -119,6 +133,8 @@ public final class BulkProcessor {
final CompletableFuture<Void> flush = doFlush(batch);
flush.whenComplete((ignored1, ignored2) -> semaphore.release());
flush.join();
lastFlushTS = System.currentTimeMillis();
}
private CompletableFuture<Void> doFlush(final List<Holder> batch) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册