提交 8196c0c1 编写于 作者: A ascrutae

完成network功能

上级 d70a40a3
package com.a.eye.skywalking.network;
import com.a.eye.skywalking.network.grpc.services.SpanStorageService;
import com.a.eye.skywalking.network.grpc.services.TraceSearchService;
import com.a.eye.skywalking.network.listener.SpanStorageNotifier;
import com.a.eye.skywalking.network.listener.TraceSearchNotifier;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class TransferService {
private static final int DEFAULT_SERVICE_PORT = 34000;
private Server server;
private TransferService(Server server) {
this.server = server;
}
public void start() throws IOException, InterruptedException {
server.start();
// 当JVM停止之后,Server也需要停止
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
TransferService.this.stop();
}
});
blockUntilShutdown();
}
private void stop() {
if (server != null) {
server.shutdown();
}
}
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static class TransferServiceBuilder {
private TransferServiceBuilder(int port) {
if (port < 0 && port > 65535) {
port = DEFAULT_SERVICE_PORT;
}
serverBuilder = ServerBuilder.forPort(port);
}
private ServerBuilder serverBuilder;
public static TransferServiceBuilder newBuilder(int port) {
return new TransferServiceBuilder(port);
}
public TransferService build() {
return new TransferService(serverBuilder.build());
}
public TransferServiceBuilder startSpanStorageService(SpanStorageNotifier spanStorageListener) {
serverBuilder.addService(new SpanStorageService(spanStorageListener));
return this;
}
public TransferServiceBuilder startTraceSearchService(TraceSearchNotifier traceSearchNotifier) {
serverBuilder.addService(new TraceSearchService(traceSearchNotifier));
return this;
}
}
}
package com.a.eye.skywalking.network.grpc.services;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.SendResult;
import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc;
import com.a.eye.skywalking.network.listener.SpanStorageNotifier;
import io.grpc.stub.StreamObserver;
public class SpanStorageService extends SpanStorageServiceGrpc.SpanStorageServiceImplBase {
private SpanStorageNotifier listener;
public SpanStorageService(SpanStorageNotifier listener) {
this.listener = listener;
}
@Override
public StreamObserver<AckSpan> storageACKSpan(final StreamObserver<SendResult> responseObserver) {
return new StreamObserver<AckSpan>() {
@Override
public void onNext(AckSpan value) {
listener.storage(value);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onNext(SendResult.newBuilder().setResult(true).build());
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<RequestSpan> storageRequestSpan(final StreamObserver<SendResult> responseObserver) {
return new StreamObserver<RequestSpan>() {
@Override
public void onNext(RequestSpan value) {
listener.storage(value);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onNext(SendResult.newBuilder().setResult(true).build());
responseObserver.onCompleted();
}
};
}
}
package com.a.eye.skywalking.network.grpc.services;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.grpc.Span;
import com.a.eye.skywalking.network.grpc.TraceSearchCondition;
import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc;
import com.a.eye.skywalking.network.listener.TraceSearchNotifier;
import io.grpc.stub.StreamObserver;
import java.util.List;
/**
* Created by xin on 2016/11/12.
*/
public class TraceSearchService extends TraceSearchServiceGrpc.TraceSearchServiceImplBase {
private TraceSearchNotifier traceSearchNotifier;
public TraceSearchService(TraceSearchNotifier traceSearchNotifier) {
this.traceSearchNotifier = traceSearchNotifier;
}
@Override
public void search(TraceSearchCondition request, StreamObserver<SearchResult> responseObserver) {
List<Span> spans = traceSearchNotifier.search(request.getTraceid());
responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build());
responseObserver.onCompleted();
}
}
package com.a.eye.skywalking.network.listener;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
public interface SpanStorageNotifier {
boolean storage(RequestSpan requestSpan);
boolean storage(AckSpan ackSpan);
}
package com.a.eye.skywalking.network.listener;
import com.a.eye.skywalking.network.grpc.Span;
import java.util.List;
/**
* Created by xin on 2016/11/12.
*/
public interface TraceSearchNotifier {
List<Span> search(String traceId);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册