diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java index 622b4e9dd4d1e927290884a0beb33197c28d126c..d58c67eeb800753c3e17dba58419328e570ad833 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/Client.java @@ -7,7 +7,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; public class Client { - private ManagedChannel channel; + private ManagedChannel channel; public Client(String ip, int address) { channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build(); @@ -17,16 +17,15 @@ public class Client { return new SpanStorageClient(channel, listener); } - - public TraceSearchClient newTraceSearchClient(StorageClientListener listener){ - return new TraceSearchClient(channel, listener); + public TraceSearchClient newTraceSearchClient() { + return new TraceSearchClient(channel); } - public void shutdown(){ + public void shutdown() { channel.shutdownNow(); } - public boolean isShutdown(){ + public boolean isShutdown() { return channel.isShutdown() || channel.isTerminated(); } } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java index 9f48834376aeff1c3f3a4bcb3dcfb31a6162125e..1ee52fdd53d364a740f7b979567e13cf1f1ca3c4 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/client/TraceSearchClient.java @@ -1,18 +1,49 @@ package com.a.eye.skywalking.network.grpc.client; +import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc; +import com.a.eye.skywalking.network.grpc.QueryTask; +import com.a.eye.skywalking.network.grpc.SearchResult; import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc; -import com.a.eye.skywalking.network.listener.client.StorageClientListener; +import com.a.eye.skywalking.network.listener.client.SearchClientListener; import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; /** * Created by wusheng on 2016/11/26. */ public class TraceSearchClient { + private final AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceStub traceSearchServiceStub; + private final TraceSearchServiceGrpc.TraceSearchServiceBlockingStub traceSearchServiceBlockingStub; - private final TraceSearchServiceGrpc.TraceSearchServiceStub traceSearchServiceStub; + public TraceSearchClient(ManagedChannel channel) { + this.traceSearchServiceStub = AsyncTraceSearchServiceGrpc.newStub(channel); + this.traceSearchServiceBlockingStub = TraceSearchServiceGrpc.newBlockingStub(channel); + } + + public void search(QueryTask queryTask, final SearchClientListener listener){ + StreamObserver serverStreamObserver = new StreamObserver() { + @Override + public void onNext(SearchResult searchResult) { + listener.onReturn(searchResult); + } + + @Override + public void onError(Throwable throwable) { + listener.onError(throwable); + } - public TraceSearchClient(ManagedChannel channel, StorageClientListener listener) { - this.traceSearchServiceStub = TraceSearchServiceGrpc.newStub(channel); + @Override + public void onCompleted() { + listener.onFinished(); + } + }; + + StreamObserver searchResult = traceSearchServiceStub.search(serverStreamObserver); + searchResult.onNext(queryTask); + searchResult.onCompleted(); } + public SearchResult search(QueryTask queryTask){ + return traceSearchServiceBlockingStub.search(queryTask); + } } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java index bfeaad84c869c1e6905f4189f67aca29a03ab975..8561ff3ac981b0360eed95b72463aa3183d8c4ae 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/AsyncTraceSearchServer.java @@ -4,7 +4,7 @@ import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc; import com.a.eye.skywalking.network.grpc.QueryTask; import com.a.eye.skywalking.network.grpc.SearchResult; import com.a.eye.skywalking.network.grpc.Span; -import com.a.eye.skywalking.network.listener.server.AsyncTraceSearchServerListener; +import com.a.eye.skywalking.network.listener.server.TraceSearchListener; import io.grpc.stub.StreamObserver; import java.util.List; @@ -14,9 +14,9 @@ import java.util.List; */ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase { - private AsyncTraceSearchServerListener searchListener; + private TraceSearchListener searchListener; - public AsyncTraceSearchServer(AsyncTraceSearchServerListener searchListener) { + public AsyncTraceSearchServer(TraceSearchListener searchListener) { this.searchListener = searchListener; } @@ -34,11 +34,20 @@ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTra @Override public void onError(Throwable t) { + SearchResult.Builder builder = SearchResult.newBuilder(); + builder = builder.setTaskId(taskId); + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); } @Override public void onCompleted() { - responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).setTaskId(taskId).build()); + SearchResult.Builder builder = SearchResult.newBuilder(); + if(spans != null) { + builder = builder.addAllSpans(spans); + } + builder = builder.setTaskId(taskId); + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } }; diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java index cca7710c324f4250943c7d30424c665e398223d9..19fc6766af163e5938757e132888d06bad958659 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/SpanStorageServer.java @@ -10,6 +10,7 @@ import io.grpc.stub.StreamObserver; public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageServiceImplBase { private SpanStorageServerListener listener; + public SpanStorageServer(SpanStorageServerListener listener) { this.listener = listener; } @@ -24,6 +25,8 @@ public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageService @Override public void onError(Throwable t) { + responseObserver.onNext(SendResult.newBuilder().setResult(false).build()); + responseObserver.onCompleted(); } @Override @@ -45,6 +48,8 @@ public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageService @Override public void onError(Throwable t) { + responseObserver.onNext(SendResult.newBuilder().setResult(false).build()); + responseObserver.onCompleted(); } @Override diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java index 45235c076dbff59209297d87ad6bdc32020b9e18..14ac159205d919facc7c38ee1383aca57cd9e81a 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/grpc/server/TraceSearchServer.java @@ -1,11 +1,11 @@ package com.a.eye.skywalking.network.grpc.server; -import com.a.eye.skywalking.network.grpc.SearchResult; -import com.a.eye.skywalking.network.grpc.TraceId; -import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc; +import com.a.eye.skywalking.network.grpc.*; import com.a.eye.skywalking.network.listener.server.TraceSearchListener; import io.grpc.stub.StreamObserver; +import java.util.List; + /** * Created by xin on 2016/11/12. */ @@ -18,10 +18,10 @@ public class TraceSearchServer extends TraceSearchServiceGrpc.TraceSearchService } @Override - public void search(TraceId request, StreamObserver responseObserver) { - // List spans = traceSearchListener.search(request.getTraceid()); - // responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build()); - // responseObserver.onCompleted(); + public void search(QueryTask request, StreamObserver responseObserver) { + List spans = traceSearchListener.search(request.getTraceId()); + responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build()); + responseObserver.onCompleted(); } } diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/SearchClientListener.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/SearchClientListener.java new file mode 100644 index 0000000000000000000000000000000000000000..101718e1634f0e129f6a8ab9a2b06adb782c08a3 --- /dev/null +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/client/SearchClientListener.java @@ -0,0 +1,14 @@ +package com.a.eye.skywalking.network.listener.client; + +import com.a.eye.skywalking.network.grpc.SearchResult; + +/** + * Created by wusheng on 2016/12/3. + */ +public interface SearchClientListener { + void onError(Throwable throwable); + + void onReturn(SearchResult result); + + void onFinished(); +} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/AsyncTraceSearchServerListener.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/AsyncTraceSearchServerListener.java deleted file mode 100644 index 7959c0f2f3142ac65cddf50c73032dcba829a83a..0000000000000000000000000000000000000000 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/AsyncTraceSearchServerListener.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.a.eye.skywalking.network.listener.server; - -import com.a.eye.skywalking.network.grpc.Span; -import com.a.eye.skywalking.network.grpc.TraceId; - -import java.util.List; - -/** - * Created by xin on 2016/11/15. - */ -public interface AsyncTraceSearchServerListener { - List search(TraceId traceId); -} diff --git a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java index bd5f247debba99fa54ee3cc71b28408a61542a17..ae2c6d90cbbb49d6d39c7e6f390200ef641fcaa8 100644 --- a/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java +++ b/skywalking-network/src/main/java/com/a/eye/skywalking/network/listener/server/TraceSearchListener.java @@ -1,9 +1,10 @@ package com.a.eye.skywalking.network.listener.server; import com.a.eye.skywalking.network.grpc.Span; +import com.a.eye.skywalking.network.grpc.TraceId; import java.util.List; public interface TraceSearchListener{ - List search(String traceId); + List search(TraceId traceId); } diff --git a/skywalking-network/src/main/proto/TraceSearchService.proto b/skywalking-network/src/main/proto/TraceSearchService.proto index f7c4d53c429cc209cdd76996d96efd0d8ee3d8a7..4636aa9d40926674ab19278309e94c5b2239fc50 100644 --- a/skywalking-network/src/main/proto/TraceSearchService.proto +++ b/skywalking-network/src/main/proto/TraceSearchService.proto @@ -11,7 +11,7 @@ service AsyncTraceSearchService { } service TraceSearchService { - rpc search (TraceId) returns (SearchResult) { + rpc search (QueryTask) returns (SearchResult) { }; }