diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/ConsumerProvider.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/ConsumerProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..5969de32767c2d936938bd0a2f071a3977ceb378 --- /dev/null +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/ConsumerProvider.java @@ -0,0 +1,33 @@ +package com.a.eye.skywalking.network; + +import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc; +import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer; +import com.a.eye.skywalking.network.grpc.provider.SpanStorageService; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub; + +public class ConsumerProvider { + private static ConsumerProvider INSTANCE; + private ManagedChannel channel; + private SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub; + + private ConsumerProvider(String ip, int address) { + channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build(); + spanStorageStub = newStub(channel); + } + + public static ConsumerProvider INSTANCE() { + return INSTANCE; + } + + public SpanStorageConsumer newSpanStorageConsumer() { + return new SpanStorageConsumer(spanStorageStub); + } + + public static ConsumerProvider init(String ip, int address) { + return INSTANCE = new ConsumerProvider(ip, address); + } + +} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/ServiceProvider.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/ServiceProvider.java index 5c060aa10b5568c42ece7b080c0f443d71d0932e..76bb2530b2fe71e5ef56a2ba346ad6c53ad13467 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/ServiceProvider.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/ServiceProvider.java @@ -43,6 +43,7 @@ public class ServiceProvider { public static class TransferServiceBuilder { private TransferServiceBuilder(int port) { serverBuilder = NettyServerBuilder.forPort(port); + serverBuilder.maxConcurrentCallsPerConnection(4); } private NettyServerBuilder serverBuilder; diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/exception/ConsumeSpanDataFailedException.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/exception/ConsumeSpanDataFailedException.java new file mode 100644 index 0000000000000000000000000000000000000000..cd50960bba65bf90f4c537e2540ea68720858509 --- /dev/null +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/exception/ConsumeSpanDataFailedException.java @@ -0,0 +1,9 @@ +package com.a.eye.skywalking.network.exception; + +/** + * Created by xin on 2016/11/23. + */ +public class ConsumeSpanDataFailedException extends RuntimeException { + public ConsumeSpanDataFailedException(Exception e) { + } +} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/consumer/SpanStorageConsumer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/consumer/SpanStorageConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..f36d45650a715d7706c94918605e93af45d8366c --- /dev/null +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/consumer/SpanStorageConsumer.java @@ -0,0 +1,83 @@ +package com.a.eye.skywalking.network.grpc.consumer; + +import com.a.eye.skywalking.network.exception.ConsumeSpanDataFailedException; +import com.a.eye.skywalking.network.grpc.AckSpan; +import com.a.eye.skywalking.network.grpc.RequestSpan; +import com.a.eye.skywalking.network.grpc.SendResult; +import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; + +public class SpanStorageConsumer { + + private final SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub; + + public SpanStorageConsumer(SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub) { + this.spanStorageStub = spanStorageStub; + } + + public void consumeRequestSpan(RequestSpan... requestSpan) { + StreamObserver requestSpanStreamObserver = + spanStorageStub.storageRequestSpan(new StreamObserver() { + @Override + public void onNext(SendResult sendResult) { + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }); + + for (RequestSpan span : requestSpan) { + requestSpanStreamObserver.onNext(span); + while (!((CallStreamObserver) requestSpanStreamObserver).isReady()) { + try { + Thread.currentThread().sleep(1); + } catch (InterruptedException e) { + throw new ConsumeSpanDataFailedException(e); + } + } + } + + requestSpanStreamObserver.onCompleted(); + } + + public void consumeACKSpan(AckSpan... ackSpan) { + StreamObserver ackSpanStreamObserver = + spanStorageStub.storageACKSpan(new StreamObserver() { + @Override + public void onNext(SendResult sendResult) { + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }); + + for (AckSpan span : ackSpan) { + ackSpanStreamObserver.onNext(span); + while (!((CallStreamObserver) ackSpanStreamObserver).isReady()) { + try { + Thread.currentThread().sleep(1); + } catch (InterruptedException e) { + throw new ConsumeSpanDataFailedException(e); + } + } + } + + ackSpanStreamObserver.onCompleted(); + } + +} diff --git a/skywalking-storage-center/skywalking-storage/pom.xml b/skywalking-storage-center/skywalking-storage/pom.xml index 5c4d339e43538363f3fe14b62510ad3a3ff6ce8c..74b97d62d4f965e36af195f65d5193a7c7da3f9d 100644 --- a/skywalking-storage-center/skywalking-storage/pom.xml +++ b/skywalking-storage-center/skywalking-storage/pom.xml @@ -132,7 +132,7 @@ + includes="*"/> @@ -157,7 +157,7 @@ + includes="*"/> diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java index f3c7c8e59c159b69d3361b34fb3cb7cfde276fdf..d5efd6cb71220937c005081be6c4f339c8ce124d 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/SpanDataConsumer.java @@ -17,17 +17,18 @@ public class SpanDataConsumer implements IConsumer { private static ILog logger = LogManager.getLogger(SpanDataConsumer.class); private DataFileWriter fileWriter; + private IndexOperator operator; @Override public void init() { fileWriter = new DataFileWriter(); + operator = IndexOperatorFactory.createIndexOperator(); } @Override public void consume(List data) { IndexMetaCollection collection = fileWriter.write(data); - IndexOperator operator = IndexOperatorFactory.createIndexOperator(); operator.batchUpdate(collection); HealthCollector.getCurrentHeathReading("SpanDataConsumer") diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperatorFactory.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperatorFactory.java index 59e1aca37ac3478d0f0d75aede295b520c23e419..91fbfbf21b23d796dcae3578411c1c3af8123ae3 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperatorFactory.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/data/index/IndexOperatorFactory.java @@ -18,7 +18,7 @@ public class IndexOperatorFactory { public static IndexOperator createIndexOperator() { try { return new IndexOperator(new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress( - new InetSocketTransportAddress(InetAddress.getLocalHost(), NetUtils.getIndexServerPort() + new InetSocketTransportAddress(InetAddress.getLoopbackAddress(), NetUtils.getIndexServerPort() ))); } catch (Exception e) { throw new IndexOperatorInitializeFailedException("Failed to initialize operator.", e); diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java index 268c5034ee93792fdf4d44512e8125818a702068..538c49ddeba192c63271e4385d2c7b0010f502e9 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java @@ -1,104 +1,23 @@ -import com.a.eye.skywalking.network.dependencies.io.grpc.ManagedChannel; -import com.a.eye.skywalking.network.dependencies.io.grpc.ManagedChannelBuilder; -import com.a.eye.skywalking.network.dependencies.io.grpc.stub.ClientCallStreamObserver; -import com.a.eye.skywalking.network.dependencies.io.grpc.stub.ServerCallStreamObserver; -import com.a.eye.skywalking.network.dependencies.io.grpc.stub.StreamObserver; -import com.a.eye.skywalking.network.grpc.*; +import com.a.eye.skywalking.network.ConsumerProvider; -import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub; +import java.util.concurrent.CountDownLatch; public class StorageClient { - private static ManagedChannel channel = - ManagedChannelBuilder.forAddress("127.0.0.1", 34000).usePlaintext(true).build(); + private static ConsumerProvider consumerProvider; - private static SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageServiceStub = newStub(channel); - - private static long endTime1 = 0; - - private static long endTime2 = 0; + private static int THREAD_COUNT = 4; + private static final long COUNT = 1_000_000_000; public static void main(String[] args) throws InterruptedException { - long startTime = System.currentTimeMillis(); - for (int i = 0; i < 1; i++) { - long value = System.currentTimeMillis(); - RequestSpan requestSpan = - RequestSpan.newBuilder().setSpanType(1).setAddress("127.0.0.1").setApplicationId("1") - .setCallType("1").setLevelId(0).setProcessNo("19287") - .setStartDate(System.currentTimeMillis()).setTraceId( - TraceId.newBuilder().addSegments(201611).addSegments(value) - .addSegments(8504828).addSegments(2277).addSegments(53).addSegments(3).build()) - .setUserId("1").setViewPointId("http://localhost:8080/wwww/test/helloWorld").build(); - - AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId( - TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277) - .addSegments(53).addSegments(3).build()).setStatusCode(0) - .setViewpointId("http://localhost:8080/wwww/test/helloWorld").build(); - - StreamObserver ackSpanStreamObserver = - spanStorageServiceStub.storageACKSpan(new StreamObserver() { - @Override - public void onNext(SendResult sendResult) { - } - - @Override - public void onError(Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void onCompleted() { - endTime1 = System.currentTimeMillis(); - } - }); - StreamObserver requestSpanStreamObserver = - spanStorageServiceStub.storageRequestSpan(new StreamObserver() { - @Override - public void onNext(SendResult sendResult) { - - } - - @Override - public void onError(Throwable throwable) { - throwable.printStackTrace(); - } - - @Override - public void onCompleted() { - endTime2 = System.currentTimeMillis(); - } - }); - for (int j = 0; j < 1; j++) { - requestSpanStreamObserver.onNext(requestSpan); - ackSpanStreamObserver.onNext(ackSpan); - - } - - ClientCallStreamObserver newRequestSpanStreamObserver = - (ClientCallStreamObserver) requestSpanStreamObserver; - - while (!newRequestSpanStreamObserver.isReady()) { - Thread.sleep(1); - } - - ackSpanStreamObserver.onCompleted(); - requestSpanStreamObserver.onCompleted(); - - - if (i % 1_000 == 0) { - System.out.println(i); - } + consumerProvider = ConsumerProvider.init("10.128.7.241", 34000); + CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); + for (int i = 0; i < THREAD_COUNT; i++) { + new StorageThread(consumerProvider, COUNT, countDownLatch).start(); } - Thread.sleep(1000L); - - System.out.println("save execute in " + (endTime1 - startTime) + "ms"); - System.out.println("save execute2 in " + (endTime2 - startTime) + "ms"); - - - Thread.sleep(10000); - + countDownLatch.await(); } } diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java new file mode 100644 index 0000000000000000000000000000000000000000..c0959f083955274e3d86b12a46fcd2186158c413 --- /dev/null +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java @@ -0,0 +1,50 @@ +import com.a.eye.skywalking.network.ConsumerProvider; +import com.a.eye.skywalking.network.grpc.AckSpan; +import com.a.eye.skywalking.network.grpc.RequestSpan; +import com.a.eye.skywalking.network.grpc.TraceId; +import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer; +import com.a.eye.skywalking.storage.util.NetUtils; + +import java.util.concurrent.CountDownLatch; + +public class StorageThread extends Thread { + + private SpanStorageConsumer consumer; + private long count; + private CountDownLatch countDownLatch; + + StorageThread(ConsumerProvider consumerProvider, long count, CountDownLatch countDownLatch) { + consumer = consumerProvider.newSpanStorageConsumer(); + this.count = count; + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + for (int i = 0; i < count; i++) { + long value = System.currentTimeMillis(); + RequestSpan requestSpan = + RequestSpan.newBuilder().setSpanType(1).setAddress(NetUtils.getLocalAddress().toString()) + .setApplicationId("1").setCallType("1").setLevelId(0).setProcessNo("19287") + .setStartDate(System.currentTimeMillis()).setTraceId( + TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828) + .addSegments(2277).addSegments(53).addSegments(3).build()).setUserId("1") + .setViewPointId("http://localhost:8080/wwww/test/helloWorld").build(); + + AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId( + TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277) + .addSegments(Thread.currentThread().getId()).addSegments(3).build()).setStatusCode(0) + .setViewpointId("http://localhost:8080/wwww/test/helloWorld").build(); + + + consumer.consumeACKSpan(ackSpan); + consumer.consumeRequestSpan(requestSpan); + + if (i % 1_000 == 0) { + System.out.println(i + " " + value); + } + } + + countDownLatch.countDown(); + } +} diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java b/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java index 92fff7856d0e025879e180e2ea89dbf4c2356d63..825102a9c5f9edcef8b32ac33ba7b1cc75707d34 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/com/a/eye/skywalking/storage/SearchClient.java @@ -10,7 +10,7 @@ import static com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc.newS public class SearchClient { private static ManagedChannel channel = - ManagedChannelBuilder.forAddress("127.0.0.1", 34000).usePlaintext(true).build(); + ManagedChannelBuilder.forAddress("10.128.7.241", 34000).usePlaintext(true).build(); private static AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceStub searchServiceStub = newStub(channel); @@ -33,11 +33,11 @@ public class SearchClient { } }; - StreamObserver searchResult = searchServiceStub.search(serverStreamObserver); + StreamObserver searchResult = searchServiceStub.search(serverStreamObserver); searchResult.onNext(QueryTask.newBuilder().setTraceId( - TraceId.newBuilder().addSegments(201611).addSegments(1479717228982L).addSegments(8504828) - .addSegments(2277).addSegments(53).addSegments(3).build()).setTaskId(1).build()); + TraceId.newBuilder().addSegments(201611).addSegments(1479803629139L).addSegments(8504828) + .addSegments(2277).addSegments(53).addSegments(3).build()).build()); searchResult.onCompleted(); Thread.sleep(10000);