提交 77f096d4 编写于 作者: wu-sheng's avatar wu-sheng

Using waiting grpc send status, instead of reponse. Avoid performance loss.

上级 8a7a6f04
......@@ -10,6 +10,7 @@ import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
public class SpanStorageClient {
......@@ -23,54 +24,59 @@ public class SpanStorageClient {
}
public void sendRequestSpan(List<RequestSpan> requestSpan) {
StreamObserver<RequestSpan> requestSpanStreamObserver =
spanStorageStub.storageRequestSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
listener.onBatchFinished(sendResult);
}
@Override
public void onError(Throwable throwable) {
listener.onError(throwable);
}
@Override
public void onCompleted() {
}
});
StreamObserver<RequestSpan> requestSpanStreamObserver = spanStorageStub.storageRequestSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
}
@Override
public void onError(Throwable throwable) {
listener.onError(throwable);
}
@Override
public void onCompleted() {
listener.onBatchFinished();
}
});
for (RequestSpan span : requestSpan) {
requestSpanStreamObserver.onNext(span);
}
while (!((CallStreamObserver<RequestSpan>) requestSpanStreamObserver).isReady()) {
LockSupport.parkNanos(1);
}
requestSpanStreamObserver.onCompleted();
}
public void sendACKSpan(List<AckSpan> ackSpan) {
StreamObserver<AckSpan> ackSpanStreamObserver =
spanStorageStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
listener.onBatchFinished(sendResult);
}
StreamObserver<AckSpan> ackSpanStreamObserver = spanStorageStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
@Override
public void onError(Throwable throwable) {
listener.onError(throwable);
}
}
@Override
public void onCompleted() {
@Override
public void onError(Throwable throwable) {
listener.onError(throwable);
}
}
});
@Override
public void onCompleted() {
listener.onBatchFinished();
}
});
for (AckSpan span : ackSpan) {
ackSpanStreamObserver.onNext(span);
}
while (!((CallStreamObserver<AckSpan>) ackSpanStreamObserver).isReady()) {
LockSupport.parkNanos(1);
}
ackSpanStreamObserver.onCompleted();
}
......
......@@ -8,5 +8,5 @@ import com.a.eye.skywalking.network.grpc.SendResult;
public interface StorageClientListener {
void onError(Throwable throwable);
void onBatchFinished(SendResult sendResult);
void onBatchFinished();
}
......@@ -134,7 +134,7 @@ public class Agent2RoutingClient extends Thread {
}
@Override
public void onBatchFinished(SendResult sendResult) {
public void onBatchFinished() {
batchFinished = true;
HealthCollector.getCurrentHeathReading("Agent2RoutingClient").updateData(HeathReading.INFO, "batch send data to routing node.");
}
......
......@@ -76,7 +76,7 @@ public class StorageThread extends Thread {
}
@Override
public void onBatchFinished(SendResult sendResult) {
public void onBatchFinished() {
isCompleted = true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册