提交 4ebacf09 编写于 作者: A ascrutae

完善Storage压测代码,修复部分bug

上级 f723250d
package com.a.eye.skywalking.network;
import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc;
import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer;
import com.a.eye.skywalking.network.grpc.provider.SpanStorageService;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub;
public class ConsumerProvider {
private static ConsumerProvider INSTANCE;
private ManagedChannel channel;
private SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub;
private ConsumerProvider(String ip, int address) {
channel = ManagedChannelBuilder.forAddress(ip, address).usePlaintext(true).build();
spanStorageStub = newStub(channel);
}
public static ConsumerProvider INSTANCE() {
return INSTANCE;
}
public SpanStorageConsumer newSpanStorageConsumer() {
return new SpanStorageConsumer(spanStorageStub);
}
public static ConsumerProvider init(String ip, int address) {
return INSTANCE = new ConsumerProvider(ip, address);
}
}
......@@ -43,6 +43,7 @@ public class ServiceProvider {
public static class TransferServiceBuilder {
private TransferServiceBuilder(int port) {
serverBuilder = NettyServerBuilder.forPort(port);
serverBuilder.maxConcurrentCallsPerConnection(4);
}
private NettyServerBuilder serverBuilder;
......
package com.a.eye.skywalking.network.exception;
/**
* Created by xin on 2016/11/23.
*/
public class ConsumeSpanDataFailedException extends RuntimeException {
public ConsumeSpanDataFailedException(Exception e) {
}
}
package com.a.eye.skywalking.network.grpc.consumer;
import com.a.eye.skywalking.network.exception.ConsumeSpanDataFailedException;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.SendResult;
import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc;
import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
public class SpanStorageConsumer {
private final SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub;
public SpanStorageConsumer(SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageStub) {
this.spanStorageStub = spanStorageStub;
}
public void consumeRequestSpan(RequestSpan... requestSpan) {
StreamObserver<RequestSpan> requestSpanStreamObserver =
spanStorageStub.storageRequestSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
}
});
for (RequestSpan span : requestSpan) {
requestSpanStreamObserver.onNext(span);
while (!((CallStreamObserver<RequestSpan>) requestSpanStreamObserver).isReady()) {
try {
Thread.currentThread().sleep(1);
} catch (InterruptedException e) {
throw new ConsumeSpanDataFailedException(e);
}
}
}
requestSpanStreamObserver.onCompleted();
}
public void consumeACKSpan(AckSpan... ackSpan) {
StreamObserver<AckSpan> ackSpanStreamObserver =
spanStorageStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
}
});
for (AckSpan span : ackSpan) {
ackSpanStreamObserver.onNext(span);
while (!((CallStreamObserver<AckSpan>) ackSpanStreamObserver).isReady()) {
try {
Thread.currentThread().sleep(1);
} catch (InterruptedException e) {
throw new ConsumeSpanDataFailedException(e);
}
}
}
ackSpanStreamObserver.onCompleted();
}
}
......@@ -132,7 +132,7 @@
<unzip src="${project.basedir}/lib/dataindex-es-5.0.1.zip"
dest="${project.build.directory}/install/data/index"/>
<chmod dir="${project.build.directory}/install/data/index/bin" perm="ugo+rx"
includes="**"/>
includes="*"/>
</target>
</configuration>
<goals>
......@@ -157,7 +157,7 @@
<configuration>
<target>
<chmod dir="${project.build.directory}/install/bin" perm="ugo+rx"
includes="**"/>
includes="*"/>
<tar destfile="${project.build.directory}/install.tar" basedir="${project.build.directory}/install">
</tar>
</target>
......
......@@ -17,17 +17,18 @@ public class SpanDataConsumer implements IConsumer<SpanData> {
private static ILog logger = LogManager.getLogger(SpanDataConsumer.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
@Override
public void init() {
fileWriter = new DataFileWriter();
operator = IndexOperatorFactory.createIndexOperator();
}
@Override
public void consume(List<SpanData> data) {
IndexMetaCollection collection = fileWriter.write(data);
IndexOperator operator = IndexOperatorFactory.createIndexOperator();
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("SpanDataConsumer")
......
......@@ -18,7 +18,7 @@ public class IndexOperatorFactory {
public static IndexOperator createIndexOperator() {
try {
return new IndexOperator(new PreBuiltTransportClient(Settings.EMPTY).addTransportAddress(
new InetSocketTransportAddress(InetAddress.getLocalHost(), NetUtils.getIndexServerPort()
new InetSocketTransportAddress(InetAddress.getLoopbackAddress(), NetUtils.getIndexServerPort()
)));
} catch (Exception e) {
throw new IndexOperatorInitializeFailedException("Failed to initialize operator.", e);
......
import com.a.eye.skywalking.network.dependencies.io.grpc.ManagedChannel;
import com.a.eye.skywalking.network.dependencies.io.grpc.ManagedChannelBuilder;
import com.a.eye.skywalking.network.dependencies.io.grpc.stub.ClientCallStreamObserver;
import com.a.eye.skywalking.network.dependencies.io.grpc.stub.ServerCallStreamObserver;
import com.a.eye.skywalking.network.dependencies.io.grpc.stub.StreamObserver;
import com.a.eye.skywalking.network.grpc.*;
import com.a.eye.skywalking.network.ConsumerProvider;
import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub;
import java.util.concurrent.CountDownLatch;
public class StorageClient {
private static ManagedChannel channel =
ManagedChannelBuilder.forAddress("127.0.0.1", 34000).usePlaintext(true).build();
private static ConsumerProvider consumerProvider;
private static SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageServiceStub = newStub(channel);
private static long endTime1 = 0;
private static long endTime2 = 0;
private static int THREAD_COUNT = 4;
private static final long COUNT = 1_000_000_000;
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
for (int i = 0; i < 1; i++) {
long value = System.currentTimeMillis();
RequestSpan requestSpan =
RequestSpan.newBuilder().setSpanType(1).setAddress("127.0.0.1").setApplicationId("1")
.setCallType("1").setLevelId(0).setProcessNo("19287")
.setStartDate(System.currentTimeMillis()).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(value)
.addSegments(8504828).addSegments(2277).addSegments(53).addSegments(3).build())
.setUserId("1").setViewPointId("http://localhost:8080/wwww/test/helloWorld").build();
AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277)
.addSegments(53).addSegments(3).build()).setStatusCode(0)
.setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
StreamObserver<AckSpan> ackSpanStreamObserver =
spanStorageServiceStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
endTime1 = System.currentTimeMillis();
}
});
StreamObserver<RequestSpan> requestSpanStreamObserver =
spanStorageServiceStub.storageRequestSpan(new StreamObserver<SendResult>() {
@Override
public void onNext(SendResult sendResult) {
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
endTime2 = System.currentTimeMillis();
}
});
for (int j = 0; j < 1; j++) {
requestSpanStreamObserver.onNext(requestSpan);
ackSpanStreamObserver.onNext(ackSpan);
}
ClientCallStreamObserver<RequestSpan> newRequestSpanStreamObserver =
(ClientCallStreamObserver<RequestSpan>) requestSpanStreamObserver;
while (!newRequestSpanStreamObserver.isReady()) {
Thread.sleep(1);
}
ackSpanStreamObserver.onCompleted();
requestSpanStreamObserver.onCompleted();
if (i % 1_000 == 0) {
System.out.println(i);
}
consumerProvider = ConsumerProvider.init("10.128.7.241", 34000);
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
new StorageThread(consumerProvider, COUNT, countDownLatch).start();
}
Thread.sleep(1000L);
System.out.println("save execute in " + (endTime1 - startTime) + "ms");
System.out.println("save execute2 in " + (endTime2 - startTime) + "ms");
Thread.sleep(10000);
countDownLatch.await();
}
}
import com.a.eye.skywalking.network.ConsumerProvider;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.network.grpc.consumer.SpanStorageConsumer;
import com.a.eye.skywalking.storage.util.NetUtils;
import java.util.concurrent.CountDownLatch;
public class StorageThread extends Thread {
private SpanStorageConsumer consumer;
private long count;
private CountDownLatch countDownLatch;
StorageThread(ConsumerProvider consumerProvider, long count, CountDownLatch countDownLatch) {
consumer = consumerProvider.newSpanStorageConsumer();
this.count = count;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
for (int i = 0; i < count; i++) {
long value = System.currentTimeMillis();
RequestSpan requestSpan =
RequestSpan.newBuilder().setSpanType(1).setAddress(NetUtils.getLocalAddress().toString())
.setApplicationId("1").setCallType("1").setLevelId(0).setProcessNo("19287")
.setStartDate(System.currentTimeMillis()).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setUserId("1")
.setViewPointId("http://localhost:8080/wwww/test/helloWorld").build();
AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277)
.addSegments(Thread.currentThread().getId()).addSegments(3).build()).setStatusCode(0)
.setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
consumer.consumeACKSpan(ackSpan);
consumer.consumeRequestSpan(requestSpan);
if (i % 1_000 == 0) {
System.out.println(i + " " + value);
}
}
countDownLatch.countDown();
}
}
......@@ -10,7 +10,7 @@ import static com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc.newS
public class SearchClient {
private static ManagedChannel channel =
ManagedChannelBuilder.forAddress("127.0.0.1", 34000).usePlaintext(true).build();
ManagedChannelBuilder.forAddress("10.128.7.241", 34000).usePlaintext(true).build();
private static AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceStub searchServiceStub = newStub(channel);
......@@ -36,8 +36,8 @@ public class SearchClient {
StreamObserver<QueryTask> searchResult = searchServiceStub.search(serverStreamObserver);
searchResult.onNext(QueryTask.newBuilder().setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(1479717228982L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setTaskId(1).build());
TraceId.newBuilder().addSegments(201611).addSegments(1479803629139L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).build());
searchResult.onCompleted();
Thread.sleep(10000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册