提交 2f057f8b 编写于 作者: wu-sheng's avatar wu-sheng

1. add TraceSearchClient to support sync and async search. 2.adjust...

1. add TraceSearchClient to support sync and async search. 2.adjust SpanStorageServer and AsyncTraceSearchServer to process onError on server side.
上级 b73e0366
......@@ -7,7 +7,7 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
public class Client {
private ManagedChannel channel;
private ManagedChannel channel;
public Client(String ip, int address) {
channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build();
......@@ -17,16 +17,15 @@ public class Client {
return new SpanStorageClient(channel, listener);
}
public TraceSearchClient newTraceSearchClient(StorageClientListener listener){
return new TraceSearchClient(channel, listener);
public TraceSearchClient newTraceSearchClient() {
return new TraceSearchClient(channel);
}
public void shutdown(){
public void shutdown() {
channel.shutdownNow();
}
public boolean isShutdown(){
public boolean isShutdown() {
return channel.isShutdown() || channel.isTerminated();
}
}
package com.a.eye.skywalking.network.grpc.client;
import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc;
import com.a.eye.skywalking.network.grpc.QueryTask;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc;
import com.a.eye.skywalking.network.listener.client.StorageClientListener;
import com.a.eye.skywalking.network.listener.client.SearchClientListener;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
/**
* Created by wusheng on 2016/11/26.
*/
public class TraceSearchClient {
private final AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceStub traceSearchServiceStub;
private final TraceSearchServiceGrpc.TraceSearchServiceBlockingStub traceSearchServiceBlockingStub;
private final TraceSearchServiceGrpc.TraceSearchServiceStub traceSearchServiceStub;
public TraceSearchClient(ManagedChannel channel) {
this.traceSearchServiceStub = AsyncTraceSearchServiceGrpc.newStub(channel);
this.traceSearchServiceBlockingStub = TraceSearchServiceGrpc.newBlockingStub(channel);
}
public void search(QueryTask queryTask, final SearchClientListener listener){
StreamObserver<SearchResult> serverStreamObserver = new StreamObserver<SearchResult>() {
@Override
public void onNext(SearchResult searchResult) {
listener.onReturn(searchResult);
}
@Override
public void onError(Throwable throwable) {
listener.onError(throwable);
}
public TraceSearchClient(ManagedChannel channel, StorageClientListener listener) {
this.traceSearchServiceStub = TraceSearchServiceGrpc.newStub(channel);
@Override
public void onCompleted() {
listener.onFinished();
}
};
StreamObserver<QueryTask> searchResult = traceSearchServiceStub.search(serverStreamObserver);
searchResult.onNext(queryTask);
searchResult.onCompleted();
}
public SearchResult search(QueryTask queryTask){
return traceSearchServiceBlockingStub.search(queryTask);
}
}
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc;
import com.a.eye.skywalking.network.grpc.QueryTask;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.grpc.Span;
import com.a.eye.skywalking.network.listener.server.AsyncTraceSearchServerListener;
import com.a.eye.skywalking.network.listener.server.TraceSearchListener;
import io.grpc.stub.StreamObserver;
import java.util.List;
......@@ -14,9 +14,9 @@ import java.util.List;
*/
public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase {
private AsyncTraceSearchServerListener searchListener;
private TraceSearchListener searchListener;
public AsyncTraceSearchServer(AsyncTraceSearchServerListener searchListener) {
public AsyncTraceSearchServer(TraceSearchListener searchListener) {
this.searchListener = searchListener;
}
......@@ -34,11 +34,20 @@ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTra
@Override
public void onError(Throwable t) {
SearchResult.Builder builder = SearchResult.newBuilder();
builder = builder.setTaskId(taskId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override
public void onCompleted() {
responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).setTaskId(taskId).build());
SearchResult.Builder builder = SearchResult.newBuilder();
if(spans != null) {
builder = builder.addAllSpans(spans);
}
builder = builder.setTaskId(taskId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
};
......
......@@ -10,6 +10,7 @@ import io.grpc.stub.StreamObserver;
public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageServiceImplBase {
private SpanStorageServerListener listener;
public SpanStorageServer(SpanStorageServerListener listener) {
this.listener = listener;
}
......@@ -24,6 +25,8 @@ public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageService
@Override
public void onError(Throwable t) {
responseObserver.onNext(SendResult.newBuilder().setResult(false).build());
responseObserver.onCompleted();
}
@Override
......@@ -45,6 +48,8 @@ public class SpanStorageServer extends SpanStorageServiceGrpc.SpanStorageService
@Override
public void onError(Throwable t) {
responseObserver.onNext(SendResult.newBuilder().setResult(false).build());
responseObserver.onCompleted();
}
@Override
......
package com.a.eye.skywalking.network.grpc.server;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc;
import com.a.eye.skywalking.network.grpc.*;
import com.a.eye.skywalking.network.listener.server.TraceSearchListener;
import io.grpc.stub.StreamObserver;
import java.util.List;
/**
* Created by xin on 2016/11/12.
*/
......@@ -18,10 +18,10 @@ public class TraceSearchServer extends TraceSearchServiceGrpc.TraceSearchService
}
@Override
public void search(TraceId request, StreamObserver<SearchResult> responseObserver) {
// List<Span> spans = traceSearchListener.search(request.getTraceid());
// responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build());
// responseObserver.onCompleted();
public void search(QueryTask request, StreamObserver<SearchResult> responseObserver) {
List<Span> spans = traceSearchListener.search(request.getTraceId());
responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build());
responseObserver.onCompleted();
}
}
package com.a.eye.skywalking.network.listener.client;
import com.a.eye.skywalking.network.grpc.SearchResult;
/**
* Created by wusheng on 2016/12/3.
*/
public interface SearchClientListener {
void onError(Throwable throwable);
void onReturn(SearchResult result);
void onFinished();
}
package com.a.eye.skywalking.network.listener.server;
import com.a.eye.skywalking.network.grpc.Span;
import com.a.eye.skywalking.network.grpc.TraceId;
import java.util.List;
/**
* Created by xin on 2016/11/15.
*/
public interface AsyncTraceSearchServerListener {
List<Span> search(TraceId traceId);
}
package com.a.eye.skywalking.network.listener.server;
import com.a.eye.skywalking.network.grpc.Span;
import com.a.eye.skywalking.network.grpc.TraceId;
import java.util.List;
public interface TraceSearchListener{
List<Span> search(String traceId);
List<Span> search(TraceId traceId);
}
......@@ -11,7 +11,7 @@ service AsyncTraceSearchService {
}
service TraceSearchService {
rpc search (TraceId) returns (SearchResult) {
rpc search (QueryTask) returns (SearchResult) {
};
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册