提交 6f85ca93 编写于 作者: A ascrutae

重构包名,类名

上级 4ebacf09
package com.a.eye.skywalking.network;
import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc;
import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer;
import com.a.eye.skywalking.network.grpc.provider.SpanStorageService;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub;
public class ConsumerProvider {
private static ConsumerProvider INSTANCE;
private ManagedChannel channel;
private SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub;
public class Client {
private ManagedChannel channel;
private SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub;
private ConsumerProvider(String ip, int address) {
public Client(String ip, int address) {
channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build();
spanStorageStub = newStub(channel);
}
public static ConsumerProvider INSTANCE() {
return INSTANCE;
}
public SpanStorageConsumer newSpanStorageConsumer() {
return new SpanStorageConsumer(spanStorageStub);
public SpanStorageClient newSpanStorageConsumer() {
return new SpanStorageClient(spanStorageStub);
}
public static ConsumerProvider init(String ip, int address) {
return INSTANCE = new ConsumerProvider(ip, address);
}
}
package com.a.eye.skywalking.network;
import com.a.eye.skywalking.network.grpc.provider.AsyncTraceSearchService;
import com.a.eye.skywalking.network.grpc.provider.SpanStorageService;
import com.a.eye.skywalking.network.grpc.provider.TraceSearchService;
import com.a.eye.skywalking.network.grpc.server.AsyncTraceSearchServer;
import com.a.eye.skywalking.network.grpc.server.SpanStorageServer;
import com.a.eye.skywalking.network.grpc.server.TraceSearchServer;
import com.a.eye.skywalking.network.listener.AsyncTraceSearchListener;
import com.a.eye.skywalking.network.listener.SpanStorageListener;
import com.a.eye.skywalking.network.listener.TraceSearchListener;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
public class ServiceProvider {
private Server server;
public class Server {
private io.grpc.Server server;
private ServiceProvider(Server server) {
private Server(io.grpc.Server server) {
this.server = server;
}
......@@ -25,7 +23,7 @@ public class ServiceProvider {
// 当JVM停止之后,Server也需要停止
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
ServiceProvider.this.stop();
Server.this.stop();
}
});
}
......@@ -48,23 +46,23 @@ public class ServiceProvider {
private NettyServerBuilder serverBuilder;
public ServiceProvider build() {
return new ServiceProvider(serverBuilder.bossEventLoopGroup(new NioEventLoopGroup(1))
public Server build() {
return new Server(serverBuilder.bossEventLoopGroup(new NioEventLoopGroup(1))
.workerEventLoopGroup(new NioEventLoopGroup()).build());
}
public TransferServiceBuilder addSpanStorageService(SpanStorageListener spanStorageListener) {
serverBuilder.addService(new SpanStorageService(spanStorageListener));
serverBuilder.addService(new SpanStorageServer(spanStorageListener));
return this;
}
public TransferServiceBuilder addTraceSearchService(TraceSearchListener traceSearchListener) {
serverBuilder.addService(new TraceSearchService(traceSearchListener));
serverBuilder.addService(new TraceSearchServer(traceSearchListener));
return this;
}
public TransferServiceBuilder addAsyncTraceSearchService(AsyncTraceSearchListener asyncTraceSearchListener){
serverBuilder.addService(new AsyncTraceSearchService(asyncTraceSearchListener));
serverBuilder.addService(new AsyncTraceSearchServer(asyncTraceSearchListener));
return this;
}
}
......
package com.a.eye.skywalking.network.grpc.consumer;
package com.a.eye.skywalking.network.grpc.client;
import com.a.eye.skywalking.network.exception.ConsumeSpanDataFailedException;
import com.a.eye.skywalking.network.grpc.AckSpan;
......@@ -8,11 +8,11 @@ import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
public class SpanStorageConsumer {
public class SpanStorageClient {
private final SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub;
public SpanStorageConsumer(SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub) {
public SpanStorageClient(SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub) {
this.spanStorageStub = spanStorageStub;
}
......
package com.a.eye.skywalking.network.grpc.provider;
package com.a.eye.skywalking.network.grpc.server;
import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc;
import com.a.eye.skywalking.network.grpc.QueryTask;
......@@ -12,11 +12,11 @@ import java.util.List;
/**
* Created by xin on 2016/11/15.
*/
public class AsyncTraceSearchService extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase {
public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase {
private AsyncTraceSearchListener searchListener;
public AsyncTraceSearchService(AsyncTraceSearchListener searchListener) {
public AsyncTraceSearchServer(AsyncTraceSearchListener searchListener) {
this.searchListener = searchListener;
}
......
package com.a.eye.skywalking.network.grpc.provider;
package com.a.eye.skywalking.network.grpc.server;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
......@@ -7,10 +7,10 @@ import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc;
import com.a.eye.skywalking.network.listener.SpanStorageListener;
import io.grpc.stub.StreamObserver;
public class SpanStorageService extends SpanStorageServiceGrpc.SpanStorageServiceImplBase {
public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageServiceImplBase {
private SpanStorageListener listener;
public SpanStorageService(SpanStorageListener listener) {
public SpanStorageServer(SpanStorageListener listener) {
this.listener = listener;
}
......
package com.a.eye.skywalking.network.grpc.provider;
package com.a.eye.skywalking.network.grpc.server;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.grpc.TraceId;
......@@ -9,11 +9,11 @@ import io.grpc.stub.StreamObserver;
/**
* Created by xin on 2016/11/12.
*/
public class TraceSearchService extends TraceSearchServiceGrpc.TraceSearchServiceImplBase {
public class TraceSearchServer extends TraceSearchServiceGrpc.TraceSearchServiceImplBase {
private TraceSearchListener traceSearchListener;
public TraceSearchService(TraceSearchListener traceSearchListener) {
public TraceSearchServer(TraceSearchListener traceSearchListener) {
this.traceSearchListener = traceSearchListener;
}
......
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.logging.impl.log4j2.Log4j2Resolver;
import com.a.eye.skywalking.network.ServiceProvider;
import com.a.eye.skywalking.network.Server;
import com.a.eye.skywalking.registry.RegistryCenterFactory;
import com.a.eye.skywalking.registry.api.CenterType;
import com.a.eye.skywalking.registry.api.RegistryCenter;
......@@ -36,7 +36,7 @@ public class Main {
LogManager.setLogResolver(new Log4j2Resolver());
}
private static ServiceProvider provider;
private static Server server;
public static void main(String[] args) {
try {
......@@ -48,9 +48,9 @@ public class Main {
DataFilesManager.init();
provider = ServiceProvider.newBuilder(Config.Server.PORT).addSpanStorageService(new StorageListener())
server = Server.newBuilder(Config.Server.PORT).addSpanStorageService(new StorageListener())
.addAsyncTraceSearchService(new SearchListener()).build();
provider.start();
server.start();
if (logger.isDebugEnable()) {
logger.debug("Service provider started.");
......@@ -64,7 +64,7 @@ public class Main {
e.printStackTrace();
logger.error("SkyWalking storage server start failure.", e);
} finally {
provider.stop();
server.stop();
}
}
......
import com.a.eye.skywalking.network.ConsumerProvider;
import java.util.concurrent.CountDownLatch;
public class StorageClient {
private static ConsumerProvider consumerProvider;
private static int THREAD_COUNT = 4;
private static final long COUNT = 1_000_000_000;
public static void main(String[] args) throws InterruptedException {
consumerProvider = ConsumerProvider.init("10.128.7.241", 34000);
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
new StorageThread(consumerProvider, COUNT, countDownLatch).start();
new StorageThread(COUNT, countDownLatch).start();
}
countDownLatch.await();
......
import com.a.eye.skywalking.network.ConsumerProvider;
import com.a.eye.skywalking.network.Client;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.storage.util.NetUtils;
import java.util.concurrent.CountDownLatch;
public class StorageThread extends Thread {
private SpanStorageConsumer consumer;
private long count;
private CountDownLatch countDownLatch;
private SpanStorageClient consumer;
private long count;
private CountDownLatch countDownLatch;
StorageThread(ConsumerProvider consumerProvider, long count, CountDownLatch countDownLatch) {
consumer = consumerProvider.newSpanStorageConsumer();
StorageThread(long count, CountDownLatch countDownLatch) {
consumer = new Client("10.128.7.241", 34000).newSpanStorageConsumer();
this.count = count;
this.countDownLatch = countDownLatch;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册