提交 aabb2474 编写于 作者: wu-sheng's avatar wu-sheng

fix client batch send performance issue, and test case performance issue.

上级 34f6bb9d
...@@ -17,7 +17,7 @@ public class Client { ...@@ -17,7 +17,7 @@ public class Client {
} }
public SpanStorageClient newSpanStorageConsumer() { public SpanStorageClient newSpanStorageClient() {
return new SpanStorageClient(spanStorageStub); return new SpanStorageClient(spanStorageStub);
} }
......
...@@ -16,7 +16,7 @@ public class SpanStorageClient { ...@@ -16,7 +16,7 @@ public class SpanStorageClient {
this.spanStorageStub = spanStorageStub; this.spanStorageStub = spanStorageStub;
} }
public void consumeRequestSpan(RequestSpan... requestSpan) { public void sendRequestSpan(RequestSpan... requestSpan) {
StreamObserver<RequestSpan> requestSpanStreamObserver = StreamObserver<RequestSpan> requestSpanStreamObserver =
spanStorageStub.storageRequestSpan(new StreamObserver<SendResult>() { spanStorageStub.storageRequestSpan(new StreamObserver<SendResult>() {
@Override @Override
...@@ -36,19 +36,19 @@ public class SpanStorageClient { ...@@ -36,19 +36,19 @@ public class SpanStorageClient {
for (RequestSpan span : requestSpan) { for (RequestSpan span : requestSpan) {
requestSpanStreamObserver.onNext(span); requestSpanStreamObserver.onNext(span);
while (!((CallStreamObserver<RequestSpan>) requestSpanStreamObserver).isReady()) { }
try { while (!((CallStreamObserver<RequestSpan>) requestSpanStreamObserver).isReady()) {
Thread.currentThread().sleep(1); try {
} catch (InterruptedException e) { Thread.currentThread().sleep(1);
throw new ConsumeSpanDataFailedException(e); } catch (InterruptedException e) {
} throw new ConsumeSpanDataFailedException(e);
} }
} }
requestSpanStreamObserver.onCompleted(); requestSpanStreamObserver.onCompleted();
} }
public void consumeACKSpan(AckSpan... ackSpan) { public void sendACKSpan(AckSpan... ackSpan) {
StreamObserver<AckSpan> ackSpanStreamObserver = StreamObserver<AckSpan> ackSpanStreamObserver =
spanStorageStub.storageACKSpan(new StreamObserver<SendResult>() { spanStorageStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override @Override
...@@ -68,12 +68,12 @@ public class SpanStorageClient { ...@@ -68,12 +68,12 @@ public class SpanStorageClient {
for (AckSpan span : ackSpan) { for (AckSpan span : ackSpan) {
ackSpanStreamObserver.onNext(span); ackSpanStreamObserver.onNext(span);
while (!((CallStreamObserver<AckSpan>) ackSpanStreamObserver).isReady()) { }
try { while (!((CallStreamObserver<AckSpan>) ackSpanStreamObserver).isReady()) {
Thread.currentThread().sleep(1); try {
} catch (InterruptedException e) { Thread.currentThread().sleep(1);
throw new ConsumeSpanDataFailedException(e); } catch (InterruptedException e) {
} throw new ConsumeSpanDataFailedException(e);
} }
} }
......
...@@ -5,42 +5,50 @@ import com.a.eye.skywalking.network.grpc.TraceId; ...@@ -5,42 +5,50 @@ import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.storage.util.NetUtils; import com.a.eye.skywalking.storage.util.NetUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
public class StorageThread extends Thread { public class StorageThread extends Thread {
private SpanStorageClient consumer; private SpanStorageClient client;
private long count; private long count;
private CountDownLatch countDownLatch; private CountDownLatch countDownLatch;
StorageThread(long count, CountDownLatch countDownLatch) { StorageThread(long count, CountDownLatch countDownLatch) {
consumer = new Client("10.128.7.241", 34000).newSpanStorageConsumer(); client = new Client("10.128.7.241", 34000).newSpanStorageClient();
this.count = count; this.count = count;
this.countDownLatch = countDownLatch; this.countDownLatch = countDownLatch;
} }
@Override @Override
public void run() { public void run() {
RequestSpan[] requestSpanList = new RequestSpan[10];
AckSpan[] ackSpanList = new AckSpan[10];
int cycle = 0;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
long value = System.currentTimeMillis(); long value = System.currentTimeMillis();
RequestSpan requestSpan = RequestSpan requestSpan = RequestSpan.newBuilder().setSpanType(1).setAddress(NetUtils.getLocalAddress().toString()).setApplicationId("1").setCallType("1").setLevelId(0)
RequestSpan.newBuilder().setSpanType(1).setAddress(NetUtils.getLocalAddress().toString()) .setProcessNo("19287").setStartDate(System.currentTimeMillis())
.setApplicationId("1").setCallType("1").setLevelId(0).setProcessNo("19287") .setTraceId(TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277).addSegments(53).addSegments(3).build())
.setStartDate(System.currentTimeMillis()).setTraceId( .setUserId("1").setViewPointId("http://localhost:8080/wwww/test/helloWorld").build();
TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setUserId("1")
.setViewPointId("http://localhost:8080/wwww/test/helloWorld").build();
AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId( AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277) TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277).addSegments(Thread.currentThread().getId()).addSegments(3)
.addSegments(Thread.currentThread().getId()).addSegments(3).build()).setStatusCode(0) .build()).setStatusCode(0).setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
.setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
if (cycle == 10) {
client.sendACKSpan(ackSpanList);
consumer.consumeACKSpan(ackSpan); client.sendRequestSpan(requestSpanList);
consumer.consumeRequestSpan(requestSpan); cycle = 0;
} else {
requestSpanList[cycle] = requestSpan;
ackSpanList[cycle] = ackSpan;
cycle++;
}
if (i % 1_000 == 0) { if (i % 10_000 == 0) {
System.out.println(i + " " + value); System.out.println(i + " " + value);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册