From 2f057f8befb44d2c2f8217b6314b1109941c8822 Mon Sep 17 00:00:00 2001 From: wusheng Date: Sat, 3 Dec 2016 18:30:49 +0800 Subject: [PATCH] 1. add TraceSearchClient to support sync and async search. 2.adjust SpanStorageServer and AsyncTraceSearchServer to process onError on server side. --- .../com/a/eye/skywalking/network/Client.java | 11 +++--- .../grpc/client/TraceSearchClient.java | 39 +++++++++++++++++-- .../grpc/server/AsyncTraceSearchServer.java | 17 ++++++-- .../grpc/server/SpanStorageServer.java | 5 +++ .../grpc/server/TraceSearchServer.java | 14 +++---- .../listener/client/SearchClientListener.java | 14 +++++++ .../AsyncTraceSearchServerListener.java | 13 ------- .../listener/server/TraceSearchListener.java | 3 +- .../src/main/proto/TraceSearchService.proto | 2 +- 9 files changed, 82 insertions(+), 36 deletions(-) create mode 100644 skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/SearchClientListener.java delete mode 100644 skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/AsyncTraceSearchServerListener.java diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java index 622b4e9dd4..d58c67eeb8 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java @@ -7,7 +7,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; public class Client { - private ManagedChannel channel; + private ManagedChannel channel; public Client(String ip, int address) { channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build(); @@ -17,16 +17,15 @@ public class Client { return new SpanStorageClient(channel, listener); } - - public TraceSearchClient newTraceSearchClient(StorageClientListener listener){ - return new TraceSearchClient(channel, listener); + public TraceSearchClient newTraceSearchClient() { + return new TraceSearchClient(channel); } - public void shutdown(){ + public void shutdown() { channel.shutdownNow(); } - public boolean isShutdown(){ + public boolean isShutdown() { return channel.isShutdown() || channel.isTerminated(); } } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java index 9f48834376..1ee52fdd53 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java @@ -1,18 +1,49 @@ package com.a.eye.skywalking.network.grpc.client; +import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc; +import com.a.eye.skywalking.network.grpc.QueryTask; +import com.a.eye.skywalking.network.grpc.SearchResult; import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc; -import com.a.eye.skywalking.network.listener.client.StorageClientListener; +import com.a.eye.skywalking.network.listener.client.SearchClientListener; import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; /** * Created by wusheng on 2016/11/26. */ public class TraceSearchClient { + private final AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceStub traceSearchServiceStub; + private final TraceSearchServiceGrpc.TraceSearchServiceBlockingStub traceSearchServiceBlockingStub; - private final TraceSearchServiceGrpc.TraceSearchServiceStub traceSearchServiceStub; + public TraceSearchClient(ManagedChannel channel) { + this.traceSearchServiceStub = AsyncTraceSearchServiceGrpc.newStub(channel); + this.traceSearchServiceBlockingStub = TraceSearchServiceGrpc.newBlockingStub(channel); + } + + public void search(QueryTask queryTask, final SearchClientListener listener){ + StreamObserver serverStreamObserver = new StreamObserver() { + @Override + public void onNext(SearchResult searchResult) { + listener.onReturn(searchResult); + } + + @Override + public void onError(Throwable throwable) { + listener.onError(throwable); + } - public TraceSearchClient(ManagedChannel channel, StorageClientListener listener) { - this.traceSearchServiceStub = TraceSearchServiceGrpc.newStub(channel); + @Override + public void onCompleted() { + listener.onFinished(); + } + }; + + StreamObserver searchResult = traceSearchServiceStub.search(serverStreamObserver); + searchResult.onNext(queryTask); + searchResult.onCompleted(); } + public SearchResult search(QueryTask queryTask){ + return traceSearchServiceBlockingStub.search(queryTask); + } } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java index bfeaad84c8..8561ff3ac9 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java @@ -4,7 +4,7 @@ import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc; import com.a.eye.skywalking.network.grpc.QueryTask; import com.a.eye.skywalking.network.grpc.SearchResult; import com.a.eye.skywalking.network.grpc.Span; -import com.a.eye.skywalking.network.listener.server.AsyncTraceSearchServerListener; +import com.a.eye.skywalking.network.listener.server.TraceSearchListener; import io.grpc.stub.StreamObserver; import java.util.List; @@ -14,9 +14,9 @@ import java.util.List; */ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase { - private AsyncTraceSearchServerListener searchListener; + private TraceSearchListener searchListener; - public AsyncTraceSearchServer(AsyncTraceSearchServerListener searchListener) { + public AsyncTraceSearchServer(TraceSearchListener searchListener) { this.searchListener = searchListener; } @@ -34,11 +34,20 @@ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTra @Override public void onError(Throwable t) { + SearchResult.Builder builder = SearchResult.newBuilder(); + builder = builder.setTaskId(taskId); + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); } @Override public void onCompleted() { - responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).setTaskId(taskId).build()); + SearchResult.Builder builder = SearchResult.newBuilder(); + if(spans != null) { + builder = builder.addAllSpans(spans); + } + builder = builder.setTaskId(taskId); + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } }; diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java index cca7710c32..19fc6766af 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java @@ -10,6 +10,7 @@ import io.grpc.stub.StreamObserver; public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageServiceImplBase { private SpanStorageServerListener listener; + public SpanStorageServer(SpanStorageServerListener listener) { this.listener = listener; } @@ -24,6 +25,8 @@ public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageService @Override public void onError(Throwable t) { + responseObserver.onNext(SendResult.newBuilder().setResult(false).build()); + responseObserver.onCompleted(); } @Override @@ -45,6 +48,8 @@ public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageService @Override public void onError(Throwable t) { + responseObserver.onNext(SendResult.newBuilder().setResult(false).build()); + responseObserver.onCompleted(); } @Override diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java index 45235c076d..14ac159205 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java @@ -1,11 +1,11 @@ package com.a.eye.skywalking.network.grpc.server; -import com.a.eye.skywalking.network.grpc.SearchResult; -import com.a.eye.skywalking.network.grpc.TraceId; -import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc; +import com.a.eye.skywalking.network.grpc.*; import com.a.eye.skywalking.network.listener.server.TraceSearchListener; import io.grpc.stub.StreamObserver; +import java.util.List; + /** * Created by xin on 2016/11/12. */ @@ -18,10 +18,10 @@ public class TraceSearchServer extends TraceSearchServiceGrpc.TraceSearchService } @Override - public void search(TraceId request, StreamObserver responseObserver) { - // List spans = traceSearchListener.search(request.getTraceid()); - // responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build()); - // responseObserver.onCompleted(); + public void search(QueryTask request, StreamObserver responseObserver) { + List spans = traceSearchListener.search(request.getTraceId()); + responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build()); + responseObserver.onCompleted(); } } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/SearchClientListener.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/SearchClientListener.java new file mode 100644 index 0000000000..101718e163 --- /dev/null +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/SearchClientListener.java @@ -0,0 +1,14 @@ +package com.a.eye.skywalking.network.listener.client; + +import com.a.eye.skywalking.network.grpc.SearchResult; + +/** + * Created by wusheng on 2016/12/3. + */ +public interface SearchClientListener { + void onError(Throwable throwable); + + void onReturn(SearchResult result); + + void onFinished(); +} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/AsyncTraceSearchServerListener.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/AsyncTraceSearchServerListener.java deleted file mode 100644 index 7959c0f2f3..0000000000 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/AsyncTraceSearchServerListener.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.a.eye.skywalking.network.listener.server; - -import com.a.eye.skywalking.network.grpc.Span; -import com.a.eye.skywalking.network.grpc.TraceId; - -import java.util.List; - -/** - * Created by xin on 2016/11/15. - */ -public interface AsyncTraceSearchServerListener { - List search(TraceId traceId); -} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java index bd5f247deb..ae2c6d90cb 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java @@ -1,9 +1,10 @@ package com.a.eye.skywalking.network.listener.server; import com.a.eye.skywalking.network.grpc.Span; +import com.a.eye.skywalking.network.grpc.TraceId; import java.util.List; public interface TraceSearchListener{ - List search(String traceId); + List search(TraceId traceId); } diff --git a/skywalking-network/src/main/proto/TraceSearchService.proto b/skywalking-network/src/main/proto/TraceSearchService.proto index f7c4d53c42..4636aa9d40 100644 --- a/skywalking-network/src/main/proto/TraceSearchService.proto +++ b/skywalking-network/src/main/proto/TraceSearchService.proto @@ -11,7 +11,7 @@ service AsyncTraceSearchService { } service TraceSearchService { - rpc search (TraceId) returns (SearchResult) { + rpc search (QueryTask) returns (SearchResult) { }; } -- GitLab