diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java index 2ac8cc3d2a8c48da10e80620ec0d415734271f78..1bd135d11916b37ddc7baeb4ff1e91a76074cb52 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java @@ -27,6 +27,7 @@ public class SpanStorageClient { StreamObserver requestSpanStreamObserver = spanStorageStub.storageRequestSpan(new StreamObserver() { @Override public void onNext(SendResult sendResult) { + listener.onBatchFinished(); } @Override @@ -36,7 +37,6 @@ public class SpanStorageClient { @Override public void onCompleted() { - listener.onBatchFinished(); } }); @@ -44,10 +44,6 @@ public class SpanStorageClient { requestSpanStreamObserver.onNext(span); } - while (!((CallStreamObserver) requestSpanStreamObserver).isReady()) { - LockSupport.parkNanos(1); - } - requestSpanStreamObserver.onCompleted(); } @@ -55,7 +51,7 @@ public class SpanStorageClient { StreamObserver ackSpanStreamObserver = spanStorageStub.storageACKSpan(new StreamObserver() { @Override public void onNext(SendResult sendResult) { - + listener.onBatchFinished(); } @Override @@ -65,7 +61,6 @@ public class SpanStorageClient { @Override public void onCompleted() { - listener.onBatchFinished(); } }); @@ -73,10 +68,6 @@ public class SpanStorageClient { ackSpanStreamObserver.onNext(span); } - while (!((CallStreamObserver) ackSpanStreamObserver).isReady()) { - LockSupport.parkNanos(1); - } - ackSpanStreamObserver.onCompleted(); } diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java index 404ec6677b1e1d1535349383000c83fe54b3e869..9374bac4871485eac3c51ecf4298ac8261be6f3e 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java @@ -11,7 +11,7 @@ public class StorageClient { CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); for (int i = 0; i < THREAD_COUNT; i++) { - new StorageThread(COUNT, countDownLatch).start(); + new StorageThread(COUNT, countDownLatch, i).start(); } countDownLatch.await(); diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java index f65afddc68df36542907364f89ced2f2039eadaf..6dbbd02bacc08a0fcf992c82281692ee462b5cd2 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java @@ -18,13 +18,15 @@ public class StorageThread extends Thread { private long count; private CountDownLatch countDownLatch; private MyStorageClientListener listener; + private int index; - StorageThread(long count, CountDownLatch countDownLatch) { + StorageThread(long count, CountDownLatch countDownLatch, int index) { listener = new MyStorageClientListener(); client = new Client("127.0.0.1", 34000).newSpanStorageClient(listener); this.count = count; this.countDownLatch = countDownLatch; + this.index = index; } @Override @@ -44,7 +46,7 @@ public class StorageThread extends Thread { TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277).addSegments(Thread.currentThread().getId()).addSegments(3) .build()).setStatusCode(0).setViewpointId("http://localhost:8080/wwww/test/helloWorld").build(); - if (cycle == 10) { + if (cycle == 100) { client.sendACKSpan(ackSpanList); client.sendRequestSpan(requestSpanList); cycle = 0; @@ -62,7 +64,7 @@ public class StorageThread extends Thread { cycle++; if (i % 10_000 == 0) { - System.out.println(i + " " + value); + System.out.println("index-" + index + " num=" + i + " " + value); } }