From 6f85ca9314da87ee48e59f247c836a331c13c0a0 Mon Sep 17 00:00:00 2001 From: ascrutae Date: Wed, 23 Nov 2016 11:42:12 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=8C=85=E5=90=8D=EF=BC=8C?= =?UTF-8?q?=E7=B1=BB=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/a/eye/skywalking/network/Client.java | 25 ++++++++++++++ .../skywalking/network/ConsumerProvider.java | 33 ------------------- .../{ServiceProvider.java => Server.java} | 26 +++++++-------- .../SpanStorageClient.java} | 6 ++-- .../AsyncTraceSearchServer.java} | 6 ++-- .../SpanStorageServer.java} | 6 ++-- .../TraceSearchServer.java} | 6 ++-- .../com/a/eye/skywalking/storage/Main.java | 10 +++--- .../src/test/java/StorageClient.java | 8 ++--- .../src/test/java/StorageThread.java | 14 ++++---- 10 files changed, 63 insertions(+), 77 deletions(-) create mode 100644 skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java delete mode 100644 skywalking-network/src/main/java/com/a/eye/skywalking/network/ConsumerProvider.java rename skywalking-network/src/main/java/com/a/eye/skywalking/network/{ServiceProvider.java => Server.java} (67%) rename skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/{consumer/SpanStorageConsumer.java => client/SpanStorageClient.java} (93%) rename skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/{provider/AsyncTraceSearchService.java => server/AsyncTraceSearchServer.java} (84%) rename skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/{provider/SpanStorageService.java => server/SpanStorageServer.java} (88%) rename skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/{provider/TraceSearchService.java => server/TraceSearchServer.java} (78%) diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java new file mode 100644 index 0000000000..b1d1f8b020 --- /dev/null +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java @@ -0,0 +1,25 @@ +package com.a.eye.skywalking.network; + +import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc; +import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub; + +public class Client { + private ManagedChannel channel; + private SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub; + + public Client(String ip, int address) { + channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build(); + spanStorageStub = newStub(channel); + } + + + public SpanStorageClient newSpanStorageConsumer() { + return new SpanStorageClient(spanStorageStub); + } + + +} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/ConsumerProvider.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/ConsumerProvider.java deleted file mode 100644 index 5969de3276..0000000000 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/ConsumerProvider.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.a.eye.skywalking.network; - -import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc; -import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer; -import com.a.eye.skywalking.network.grpc.provider.SpanStorageService; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; - -import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub; - -public class ConsumerProvider { - private static ConsumerProvider INSTANCE; - private ManagedChannel channel; - private SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub; - - private ConsumerProvider(String ip, int address) { - channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build(); - spanStorageStub = newStub(channel); - } - - public static ConsumerProvider INSTANCE() { - return INSTANCE; - } - - public SpanStorageConsumer newSpanStorageConsumer() { - return new SpanStorageConsumer(spanStorageStub); - } - - public static ConsumerProvider init(String ip, int address) { - return INSTANCE = new ConsumerProvider(ip, address); - } - -} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/ServiceProvider.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Server.java similarity index 67% rename from skywalking-network/src/main/java/com/a/eye/skywalking/network/ServiceProvider.java rename to skywalking-network/src/main/java/com/a/eye/skywalking/network/Server.java index 76bb2530b2..6770e18af4 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/ServiceProvider.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Server.java @@ -1,22 +1,20 @@ package com.a.eye.skywalking.network; -import com.a.eye.skywalking.network.grpc.provider.AsyncTraceSearchService; -import com.a.eye.skywalking.network.grpc.provider.SpanStorageService; -import com.a.eye.skywalking.network.grpc.provider.TraceSearchService; +import com.a.eye.skywalking.network.grpc.server.AsyncTraceSearchServer; +import com.a.eye.skywalking.network.grpc.server.SpanStorageServer; +import com.a.eye.skywalking.network.grpc.server.TraceSearchServer; import com.a.eye.skywalking.network.listener.AsyncTraceSearchListener; import com.a.eye.skywalking.network.listener.SpanStorageListener; import com.a.eye.skywalking.network.listener.TraceSearchListener; -import io.grpc.Server; -import io.grpc.ServerBuilder; import io.grpc.netty.NettyServerBuilder; import io.netty.channel.nio.NioEventLoopGroup; import java.io.IOException; -public class ServiceProvider { - private Server server; +public class Server { + private io.grpc.Server server; - private ServiceProvider(Server server) { + private Server(io.grpc.Server server) { this.server = server; } @@ -25,7 +23,7 @@ public class ServiceProvider { // 当JVM停止之后,Server也需要停止 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { - ServiceProvider.this.stop(); + Server.this.stop(); } }); } @@ -48,23 +46,23 @@ public class ServiceProvider { private NettyServerBuilder serverBuilder; - public ServiceProvider build() { - return new ServiceProvider(serverBuilder.bossEventLoopGroup(new NioEventLoopGroup(1)) + public Server build() { + return new Server(serverBuilder.bossEventLoopGroup(new NioEventLoopGroup(1)) .workerEventLoopGroup(new NioEventLoopGroup()).build()); } public TransferServiceBuilder addSpanStorageService(SpanStorageListener spanStorageListener) { - serverBuilder.addService(new SpanStorageService(spanStorageListener)); + serverBuilder.addService(new SpanStorageServer(spanStorageListener)); return this; } public TransferServiceBuilder addTraceSearchService(TraceSearchListener traceSearchListener) { - serverBuilder.addService(new TraceSearchService(traceSearchListener)); + serverBuilder.addService(new TraceSearchServer(traceSearchListener)); return this; } public TransferServiceBuilder addAsyncTraceSearchService(AsyncTraceSearchListener asyncTraceSearchListener){ - serverBuilder.addService(new AsyncTraceSearchService(asyncTraceSearchListener)); + serverBuilder.addService(new AsyncTraceSearchServer(asyncTraceSearchListener)); return this; } } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/consumer/SpanStorageConsumer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java similarity index 93% rename from skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/consumer/SpanStorageConsumer.java rename to skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java index f36d45650a..bd0a12fb7a 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/consumer/SpanStorageConsumer.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/SpanStorageClient.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.network.grpc.consumer; +package com.a.eye.skywalking.network.grpc.client; import com.a.eye.skywalking.network.exception.ConsumeSpanDataFailedException; import com.a.eye.skywalking.network.grpc.AckSpan; @@ -8,11 +8,11 @@ import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc; import io.grpc.stub.CallStreamObserver; import io.grpc.stub.StreamObserver; -public class SpanStorageConsumer { +public class SpanStorageClient { private final SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub; - public SpanStorageConsumer(SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub) { + public SpanStorageClient(SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub) { this.spanStorageStub = spanStorageStub; } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/AsyncTraceSearchService.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java similarity index 84% rename from skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/AsyncTraceSearchService.java rename to skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java index 9caad6ed29..3dd75a3261 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/AsyncTraceSearchService.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.network.grpc.provider; +package com.a.eye.skywalking.network.grpc.server; import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc; import com.a.eye.skywalking.network.grpc.QueryTask; @@ -12,11 +12,11 @@ import java.util.List; /** * Created by xin on 2016/11/15. */ -public class AsyncTraceSearchService extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase { +public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase { private AsyncTraceSearchListener searchListener; - public AsyncTraceSearchService(AsyncTraceSearchListener searchListener) { + public AsyncTraceSearchServer(AsyncTraceSearchListener searchListener) { this.searchListener = searchListener; } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/SpanStorageService.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java similarity index 88% rename from skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/SpanStorageService.java rename to skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java index ca2f2cfa29..1db52ac512 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/SpanStorageService.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.network.grpc.provider; +package com.a.eye.skywalking.network.grpc.server; import com.a.eye.skywalking.network.grpc.AckSpan; import com.a.eye.skywalking.network.grpc.RequestSpan; @@ -7,10 +7,10 @@ import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc; import com.a.eye.skywalking.network.listener.SpanStorageListener; import io.grpc.stub.StreamObserver; -public class SpanStorageService extends SpanStorageServiceGrpc.SpanStorageServiceImplBase { +public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageServiceImplBase { private SpanStorageListener listener; - public SpanStorageService(SpanStorageListener listener) { + public SpanStorageServer(SpanStorageListener listener) { this.listener = listener; } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/TraceSearchService.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java similarity index 78% rename from skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/TraceSearchService.java rename to skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java index 366c6a2413..b81c6d1c33 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/provider/TraceSearchService.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java @@ -1,4 +1,4 @@ -package com.a.eye.skywalking.network.grpc.provider; +package com.a.eye.skywalking.network.grpc.server; import com.a.eye.skywalking.network.grpc.SearchResult; import com.a.eye.skywalking.network.grpc.TraceId; @@ -9,11 +9,11 @@ import io.grpc.stub.StreamObserver; /** * Created by xin on 2016/11/12. */ -public class TraceSearchService extends TraceSearchServiceGrpc.TraceSearchServiceImplBase { +public class TraceSearchServer extends TraceSearchServiceGrpc.TraceSearchServiceImplBase { private TraceSearchListener traceSearchListener; - public TraceSearchService(TraceSearchListener traceSearchListener) { + public TraceSearchServer(TraceSearchListener traceSearchListener) { this.traceSearchListener = traceSearchListener; } diff --git a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java index 101f5b7ef6..723d5cd48a 100644 --- a/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java +++ b/skywalking-storage-center/skywalking-storage/src/main/java/com/a/eye/skywalking/storage/Main.java @@ -4,7 +4,7 @@ import com.a.eye.skywalking.health.report.HealthCollector; import com.a.eye.skywalking.logging.api.ILog; import com.a.eye.skywalking.logging.api.LogManager; import com.a.eye.skywalking.logging.impl.log4j2.Log4j2Resolver; -import com.a.eye.skywalking.network.ServiceProvider; +import com.a.eye.skywalking.network.Server; import com.a.eye.skywalking.registry.RegistryCenterFactory; import com.a.eye.skywalking.registry.api.CenterType; import com.a.eye.skywalking.registry.api.RegistryCenter; @@ -36,7 +36,7 @@ public class Main { LogManager.setLogResolver(new Log4j2Resolver()); } - private static ServiceProvider provider; + private static Server server; public static void main(String[] args) { try { @@ -48,9 +48,9 @@ public class Main { DataFilesManager.init(); - provider = ServiceProvider.newBuilder(Config.Server.PORT).addSpanStorageService(new StorageListener()) + server = Server.newBuilder(Config.Server.PORT).addSpanStorageService(new StorageListener()) .addAsyncTraceSearchService(new SearchListener()).build(); - provider.start(); + server.start(); if (logger.isDebugEnable()) { logger.debug("Service provider started."); @@ -64,7 +64,7 @@ public class Main { e.printStackTrace(); logger.error("SkyWalking storage server start failure.", e); } finally { - provider.stop(); + server.stop(); } } diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java index 538c49ddeb..404ec6677b 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageClient.java @@ -1,21 +1,17 @@ -import com.a.eye.skywalking.network.ConsumerProvider; - import java.util.concurrent.CountDownLatch; public class StorageClient { - private static ConsumerProvider consumerProvider; - private static int THREAD_COUNT = 4; private static final long COUNT = 1_000_000_000; public static void main(String[] args) throws InterruptedException { - consumerProvider = ConsumerProvider.init("10.128.7.241", 34000); + CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); for (int i = 0; i < THREAD_COUNT; i++) { - new StorageThread(consumerProvider, COUNT, countDownLatch).start(); + new StorageThread(COUNT, countDownLatch).start(); } countDownLatch.await(); diff --git a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java index c0959f0839..fb75435a76 100644 --- a/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java +++ b/skywalking-storage-center/skywalking-storage/src/test/java/StorageThread.java @@ -1,20 +1,20 @@ -import com.a.eye.skywalking.network.ConsumerProvider; +import com.a.eye.skywalking.network.Client; import com.a.eye.skywalking.network.grpc.AckSpan; import com.a.eye.skywalking.network.grpc.RequestSpan; import com.a.eye.skywalking.network.grpc.TraceId; -import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer; +import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; import com.a.eye.skywalking.storage.util.NetUtils; import java.util.concurrent.CountDownLatch; public class StorageThread extends Thread { - private SpanStorageConsumer consumer; - private long count; - private CountDownLatch countDownLatch; + private SpanStorageClient consumer; + private long count; + private CountDownLatch countDownLatch; - StorageThread(ConsumerProvider consumerProvider, long count, CountDownLatch countDownLatch) { - consumer = consumerProvider.newSpanStorageConsumer(); + StorageThread(long count, CountDownLatch countDownLatch) { + consumer = new Client("10.128.7.241", 34000).newSpanStorageConsumer(); this.count = count; this.countDownLatch = countDownLatch; } -- GitLab