BulkProcessor.java 6.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.skywalking.library.elasticsearch.bulk;

import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
33
import lombok.RequiredArgsConstructor;
34 35 36 37 38 39 40 41 42 43 44
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.library.elasticsearch.ElasticSearch;
import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;

import static java.util.Objects.requireNonNull;

@Slf4j
public final class BulkProcessor {
45
    private final ArrayBlockingQueue<Holder> requests;
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77

    private final AtomicReference<ElasticSearch> es;
    private final int bulkActions;
    private final Semaphore semaphore;

    public static BulkProcessorBuilder builder() {
        return new BulkProcessorBuilder();
    }

    BulkProcessor(
        final AtomicReference<ElasticSearch> es, final int bulkActions,
        final Duration flushInterval, final int concurrentRequests) {
        requireNonNull(flushInterval, "flushInterval");

        this.es = requireNonNull(es, "es");
        this.bulkActions = bulkActions;
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
        this.requests = new ArrayBlockingQueue<>(bulkActions + 1);

        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
            1, r -> {
            final Thread thread = new Thread(r);
            thread.setName("ElasticSearch BulkProcessor");
            return thread;
        });
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        scheduler.scheduleWithFixedDelay(
            this::flush, 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
    }

78 79
    public CompletableFuture<Void> add(IndexRequest request) {
        return internalAdd(request);
80 81
    }

82 83
    public CompletableFuture<Void> add(UpdateRequest request) {
        return internalAdd(request);
84 85
    }

86
    @SneakyThrows
87
    private CompletableFuture<Void> internalAdd(Object request) {
88
        requireNonNull(request, "request");
89 90
        final CompletableFuture<Void> f = new CompletableFuture<>();
        requests.put(new Holder(f, request));
91
        flushIfNeeded();
92
        return f;
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
    }

    @SneakyThrows
    private void flushIfNeeded() {
        if (requests.size() >= bulkActions) {
            flush();
        }
    }

    void flush() {
        if (requests.isEmpty()) {
            return;
        }

        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            log.error("Interrupted when trying to get semaphore to execute bulk requests", e);
            return;
        }

114
        final List<Holder> batch = new ArrayList<>(requests.size());
115 116 117 118 119 120 121
        requests.drainTo(batch);

        final CompletableFuture<Void> flush = doFlush(batch);
        flush.whenComplete((ignored1, ignored2) -> semaphore.release());
        flush.join();
    }

122
    private CompletableFuture<Void> doFlush(final List<Holder> batch) {
123 124
        log.debug("Executing bulk with {} requests", batch.size());

125 126 127 128
        if (batch.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }

129 130 131 132
        final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
            try {
                final RequestFactory rf = v.requestFactory();
                final List<byte[]> bs = new ArrayList<>();
133 134
                for (final Holder holder : batch) {
                    bs.add(v.codec().encode(holder.request));
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
                    bs.add("\n".getBytes());
                }
                final ByteBuf content = Unpooled.wrappedBuffer(bs.toArray(new byte[0][]));
                return es.get().client().execute(rf.bulk().bulk(content))
                         .aggregate().thenAccept(response -> {
                        final HttpStatus status = response.status();
                        if (status != HttpStatus.OK) {
                            throw new RuntimeException(response.contentUtf8());
                        }
                    });
            } catch (Exception e) {
                return Exceptions.throwUnsafely(e);
            }
        });
        future.whenComplete((ignored, exception) -> {
            if (exception != null) {
151 152
                batch.stream().map(it -> it.future)
                     .forEach(it -> it.completeExceptionally(exception));
153 154 155
                log.error("Failed to execute requests in bulk", exception);
            } else {
                log.debug("Succeeded to execute {} requests in bulk", batch.size());
156
                batch.stream().map(it -> it.future).forEach(it -> it.complete(null));
157 158 159 160
            }
        });
        return future;
    }
161 162 163 164 165 166

    @RequiredArgsConstructor
    static class Holder {
        private final CompletableFuture<Void> future;
        private final Object request;
    }
167
}