From 6b1014c33673e05db8e1296f3995ec9a493932fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Fri, 2 Nov 2018 10:33:24 +0800 Subject: [PATCH] Fixed the collector OOM bug. (#1862) * Fixed the bug of remote client not blocked when not received on complete message, it will carry the out of a memory exception. * Sleep 10ms, not to sleep max 10ms. * No more than 10 stream observers are allowed at the same time to send remote message. Otherwise block the remote queue. * no message --- .../core/remote/client/GRPCRemoteClient.java | 71 ++++++------------- 1 file changed, 23 insertions(+), 48 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java index 104f3167c3..e367f4ddb4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.remote.client; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; @@ -40,6 +41,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable carrier; private final StreamDataClassGetter streamDataClassGetter; + private final AtomicInteger concurrentStreamObserverNumber = new AtomicInteger(0); public GRPCRemoteClient(StreamDataClassGetter streamDataClassGetter, RemoteInstance remoteInstance, int channelSize, int bufferSize) { @@ -67,6 +69,7 @@ public class GRPCRemoteClient implements RemoteClient, Comparable remoteMessages) { StreamObserver streamObserver = createStreamObserver(); + for (RemoteMessage remoteMessage : remoteMessages) { streamObserver.onNext(remoteMessage); } @@ -84,67 +87,39 @@ public class GRPCRemoteClient implements RemoteClient, Comparable createStreamObserver() { RemoteServiceGrpc.RemoteServiceStub stub = RemoteServiceGrpc.newStub(client.getChannel()); - StreamStatus status = new StreamStatus(false); + int sleepTotalMillis = 0; + int sleepMillis = 10; + while (concurrentStreamObserverNumber.incrementAndGet() > 10) { + concurrentStreamObserverNumber.addAndGet(-1); + + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + + sleepTotalMillis += sleepMillis; + + if (sleepTotalMillis > 60000) { + logger.warn("Remote client block times over 60 seconds."); + } + } + return stub.call(new StreamObserver() { @Override public void onNext(Empty empty) { } @Override public void onError(Throwable throwable) { + concurrentStreamObserverNumber.addAndGet(-1); logger.error(throwable.getMessage(), throwable); } @Override public void onCompleted() { - status.finished(); + concurrentStreamObserverNumber.addAndGet(-1); } }); } - class StreamStatus { - - private final Logger logger = LoggerFactory.getLogger(StreamStatus.class); - - private volatile boolean status; - - StreamStatus(boolean status) { - this.status = status; - } - - public boolean isFinish() { - return status; - } - - void finished() { - this.status = true; - } - - /** - * @param maxTimeout max wait time, milliseconds. - */ - public void wait4Finish(long maxTimeout) { - long time = 0; - while (!status) { - if (time > maxTimeout) { - break; - } - try2Sleep(5); - time += 5; - } - } - - /** - * Try to sleep, and ignore the {@link InterruptedException} - * - * @param millis the length of time to sleep in milliseconds - */ - private void try2Sleep(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - } - @Override public int compareTo(GRPCRemoteClient o) { return this.client.toString().compareTo(o.client.toString()); } -- GitLab