From aabb24744342b561ada5a0e5123b155ed372a5aa Mon Sep 17 00:00:00 2001 From: wusheng Date: Wed, 23 Nov 2016 21:19:47 +0800 Subject: [PATCH] fix client batch send performance issue, and test case performance issue. --- .../com/a/eye/skywalking/network/Client.java | 2 +- .../grpc/client/SpanStorageClient.java | 28 ++++++------- .../src/test/java/StorageThread.java | 42 +++++++++++-------- 3 files changed, 40 insertions(+), 32 deletions(-) diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java index b1d1f8b02..5c9681e2c 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java @@ -17,7 +17,7 @@ public class Client { } - public SpanStorageClient newSpanStorageConsumer() { + public SpanStorageClient newSpanStorageClient() { return new SpanStorageClient(spanStorageStub); } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java index bd0a12fb7..74731449e 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java @@ -16,7 +16,7 @@ public class SpanStorageClient { this.spanStorageStub = spanStorageStub; } - public void consumeRequestSpan(RequestSpan... requestSpan) { + public void sendRequestSpan(RequestSpan... requestSpan) { StreamObserver requestSpanStreamObserver = spanStorageStub.storageRequestSpan(new StreamObserver() { @Override @@ -36,19 +36,19 @@ public class SpanStorageClient { for (RequestSpan span : requestSpan) { requestSpanStreamObserver.onNext(span); - while (!((CallStreamObserver) requestSpanStreamObserver).isReady()) { - try { - Thread.currentThread().sleep(1); - } catch (InterruptedException e) { - throw new ConsumeSpanDataFailedException(e); - } + } + while (!((CallStreamObserver) requestSpanStreamObserver).isReady()) { + try { + Thread.currentThread().sleep(1); + } catch (InterruptedException e) { + throw new ConsumeSpanDataFailedException(e); } } requestSpanStreamObserver.onCompleted(); } - public void consumeACKSpan(AckSpan... ackSpan) { + public void sendACKSpan(AckSpan... ackSpan) { StreamObserver ackSpanStreamObserver = spanStorageStub.storageACKSpan(new StreamObserver() { @Override @@ -68,12 +68,12 @@ public class SpanStorageClient { for (AckSpan span : ackSpan) { ackSpanStreamObserver.onNext(span); - while (!((CallStreamObserver) ackSpanStreamObserver).isReady()) { - try { - Thread.currentThread().sleep(1); - } catch (InterruptedException e) { - throw new ConsumeSpanDataFailedException(e); - } + } + while (!((CallStreamObserver) ackSpanStreamObserver).isReady()) { + try { + Thread.currentThread().sleep(1); + } catch (InterruptedException e) { + throw new ConsumeSpanDataFailedException(e); } } diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java index fb75435a7..30e0614cc 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java @@ -5,42 +5,50 @@ import com.a.eye.skywalking.network.grpc.TraceId; import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; import com.a.eye.skywalking.storage.util.NetUtils; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; public class StorageThread extends Thread { - private SpanStorageClient consumer; + private SpanStorageClient client; private long count; private CountDownLatch countDownLatch; StorageThread(long count, CountDownLatch countDownLatch) { - consumer = new Client("10.128.7.241", 34000).newSpanStorageConsumer(); + client = new Client("10.128.7.241", 34000).newSpanStorageClient(); this.count = count; this.countDownLatch = countDownLatch; } @Override public void run() { + RequestSpan[] requestSpanList = new RequestSpan[10]; + AckSpan[] ackSpanList = new AckSpan[10]; + int cycle = 0; for (int i = 0; i < count; i++) { + long value = System.currentTimeMillis(); - RequestSpan requestSpan = - RequestSpan.newBuilder().setSpanType(1).setAddress(NetUtils.getLocalAddress().toString()) - .setApplicationId("1").setCallType("1").setLevelId(0).setProcessNo("19287") - .setStartDate(System.currentTimeMillis()).setTraceId( - TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828) - .addSegments(2277).addSegments(53).addSegments(3).build()).setUserId("1") - .setViewPointId("http://localhost:8080/wwww/test/helloWorld").build(); + RequestSpan requestSpan = RequestSpan.newBuilder().setSpanType(1).setAddress(NetUtils.getLocalAddress().toString()).setApplicationId("1").setCallType("1").setLevelId(0) + .setProcessNo("19287").setStartDate(System.currentTimeMillis()) + .setTraceId(TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277).addSegments(53).addSegments(3).build()) + .setUserId("1").setViewPointId("http://localhost:8080/wwww/test/helloWorld").build(); AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId( - TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277) - .addSegments(Thread.currentThread().getId()).addSegments(3).build()).setStatusCode(0) - .setViewpointId("http://localhost:8080/wwww/test/helloWorld").build(); - - - consumer.consumeACKSpan(ackSpan); - consumer.consumeRequestSpan(requestSpan); + TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277).addSegments(Thread.currentThread().getId()).addSegments(3) + .build()).setStatusCode(0).setViewpointId("http://localhost:8080/wwww/test/helloWorld").build(); + + if (cycle == 10) { + client.sendACKSpan(ackSpanList); + client.sendRequestSpan(requestSpanList); + cycle = 0; + } else { + requestSpanList[cycle] = requestSpan; + ackSpanList[cycle] = ackSpan; + cycle++; + } - if (i % 1_000 == 0) { + if (i % 10_000 == 0) { System.out.println(i + " " + value); } } -- GitLab