From 77f096d46f6f2314d14fc0ed992530d501dfa788 Mon Sep 17 00:00:00 2001 From: wusheng Date: Sun, 27 Nov 2016 17:24:08 +0800 Subject: [PATCH] Using waiting grpc send status, instead of reponse. Avoid performance loss. --- .../grpc/client/SpanStorageClient.java | 68 ++++++++++--------- .../client/StorageClientListener.java | 2 +- .../client/Agent2RoutingClient.java | 2 +- .../src/test/java/StorageThread.java | 2 +- 4 files changed, 40 insertions(+), 34 deletions(-) diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java index a57a5fb49b..2ac8cc3d2a 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java @@ -10,6 +10,7 @@ import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.concurrent.locks.LockSupport; public class SpanStorageClient { @@ -23,54 +24,59 @@ public class SpanStorageClient { } public void sendRequestSpan(List requestSpan) { - StreamObserver requestSpanStreamObserver = - spanStorageStub.storageRequestSpan(new StreamObserver() { - @Override - public void onNext(SendResult sendResult) { - listener.onBatchFinished(sendResult); - } - - @Override - public void onError(Throwable throwable) { - listener.onError(throwable); - } - - @Override - public void onCompleted() { - - } - }); + StreamObserver requestSpanStreamObserver = spanStorageStub.storageRequestSpan(new StreamObserver() { + @Override + public void onNext(SendResult sendResult) { + } + + @Override + public void onError(Throwable throwable) { + listener.onError(throwable); + } + + @Override + public void onCompleted() { + listener.onBatchFinished(); + } + }); for (RequestSpan span : requestSpan) { requestSpanStreamObserver.onNext(span); } + while (!((CallStreamObserver) requestSpanStreamObserver).isReady()) { + LockSupport.parkNanos(1); + } + requestSpanStreamObserver.onCompleted(); } public void sendACKSpan(List ackSpan) { - StreamObserver ackSpanStreamObserver = - spanStorageStub.storageACKSpan(new StreamObserver() { - @Override - public void onNext(SendResult sendResult) { - listener.onBatchFinished(sendResult); - } + StreamObserver ackSpanStreamObserver = spanStorageStub.storageACKSpan(new StreamObserver() { + @Override + public void onNext(SendResult sendResult) { - @Override - public void onError(Throwable throwable) { - listener.onError(throwable); - } + } - @Override - public void onCompleted() { + @Override + public void onError(Throwable throwable) { + listener.onError(throwable); + } - } - }); + @Override + public void onCompleted() { + listener.onBatchFinished(); + } + }); for (AckSpan span : ackSpan) { ackSpanStreamObserver.onNext(span); } + while (!((CallStreamObserver) ackSpanStreamObserver).isReady()) { + LockSupport.parkNanos(1); + } + ackSpanStreamObserver.onCompleted(); } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/StorageClientListener.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/StorageClientListener.java index 75bca8d640..469e043b14 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/StorageClientListener.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/StorageClientListener.java @@ -8,5 +8,5 @@ import com.a.eye.skywalking.network.grpc.SendResult; public interface StorageClientListener { void onError(Throwable throwable); - void onBatchFinished(SendResult sendResult); + void onBatchFinished(); } diff --git a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/client/Agent2RoutingClient.java b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/client/Agent2RoutingClient.java index e3f931f967..fdb02d8d02 100644 --- a/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/client/Agent2RoutingClient.java +++ b/skywalking-sniffer/skywalking-api/src/main/java/com/a/eye/skywalking/client/Agent2RoutingClient.java @@ -134,7 +134,7 @@ public class Agent2RoutingClient extends Thread { } @Override - public void onBatchFinished(SendResult sendResult) { + public void onBatchFinished() { batchFinished = true; HealthCollector.getCurrentHeathReading("Agent2RoutingClient").updateData(HeathReading.INFO, "batch send data to routing node."); } diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java index 4465104d80..236672a337 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java @@ -76,7 +76,7 @@ public class StorageThread extends Thread { } @Override - public void onBatchFinished(SendResult sendResult) { + public void onBatchFinished() { isCompleted = true; } -- GitLab