提交 e8eef8e0 编写于 作者: A ascrutae

修改TraceId生成规则

上级 d7937df9
package com.a.eye.skywalking.network;
import com.a.eye.skywalking.network.grpc.provider.AsyncTraceSearchService;
import com.a.eye.skywalking.network.grpc.provider.SpanStorageService;
import com.a.eye.skywalking.network.grpc.provider.TraceSearchService;
import com.a.eye.skywalking.network.listener.AsyncTraceSearchListener;
import com.a.eye.skywalking.network.listener.SpanStorageListener;
import com.a.eye.skywalking.network.listener.TraceSearchListener;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
......@@ -38,13 +42,14 @@ public class ServiceProvider {
public static class TransferServiceBuilder {
private TransferServiceBuilder(int port) {
serverBuilder = ServerBuilder.forPort(port);
serverBuilder = NettyServerBuilder.forPort(port);
}
private ServerBuilder serverBuilder;
private NettyServerBuilder serverBuilder;
public ServiceProvider build() {
return new ServiceProvider(serverBuilder.build());
return new ServiceProvider(serverBuilder.bossEventLoopGroup(new NioEventLoopGroup(1))
.workerEventLoopGroup(new NioEventLoopGroup()).build());
}
public TransferServiceBuilder addSpanStorageService(SpanStorageListener spanStorageListener) {
......@@ -56,5 +61,10 @@ public class ServiceProvider {
serverBuilder.addService(new TraceSearchService(traceSearchListener));
return this;
}
public TransferServiceBuilder addAsyncTraceSearchService(AsyncTraceSearchListener asyncTraceSearchListener){
serverBuilder.addService(new AsyncTraceSearchService(asyncTraceSearchListener));
return this;
}
}
}
package com.a.eye.skywalking.network.grpc.provider;
import com.a.eye.skywalking.network.grpc.AsyncTraceSearchServiceGrpc;
import com.a.eye.skywalking.network.grpc.QueryTask;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.listener.AsyncTraceSearchListener;
import io.grpc.stub.StreamObserver;
/**
* Created by xin on 2016/11/15.
*/
public class AsyncTraceSearchService extends AsyncTraceSearchServiceGrpc.AsyncTraceSearchServiceImplBase {
public AsyncTraceSearchService(AsyncTraceSearchListener asyncTraceSearchListener) {
}
@Override
public StreamObserver<QueryTask> search(StreamObserver<SearchResult> responseObserver) {
return super.search(responseObserver);
}
}
......@@ -19,7 +19,6 @@ public class SpanStorageService extends SpanStorageServiceGrpc.SpanStorageServic
return new StreamObserver<AckSpan>() {
@Override
public void onNext(AckSpan value) {
listener.storage(value);
}
......
package com.a.eye.skywalking.network.grpc.provider;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.grpc.Span;
import com.a.eye.skywalking.network.grpc.TraceSearchCondition;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.network.grpc.TraceSearchServiceGrpc;
import com.a.eye.skywalking.network.listener.TraceSearchListener;
import io.grpc.stub.StreamObserver;
import java.util.List;
/**
* Created by xin on 2016/11/12.
*/
......@@ -21,9 +18,10 @@ public class TraceSearchService extends TraceSearchServiceGrpc.TraceSearchServic
}
@Override
public void search(TraceSearchCondition request, StreamObserver<SearchResult> responseObserver) {
List<Span> spans = traceSearchListener.search(request.getTraceid());
responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build());
responseObserver.onCompleted();
public void search(TraceId request, StreamObserver<SearchResult> responseObserver) {
// List<Span> spans = traceSearchListener.search(request.getTraceid());
// responseObserver.onNext(SearchResult.newBuilder().addAllSpans(spans).build());
// responseObserver.onCompleted();
}
}
package com.a.eye.skywalking.network.listener;
/**
* Created by xin on 2016/11/15.
*/
public interface AsyncTraceSearchListener {
}
......@@ -4,9 +4,6 @@ import com.a.eye.skywalking.network.grpc.Span;
import java.util.List;
/**
* Created by xin on 2016/11/12.
*/
public interface TraceSearchListener{
List<Span> search(String traceId);
}
......@@ -4,7 +4,7 @@ option java_multiple_files = true;
option java_package = "com.a.eye.skywalking.network.grpc";
message AckSpan {
string traceId = 1;
TraceId traceId = 1;
string parentLevel = 2;
int32 levelId = 3;
int64 cost = 4;
......@@ -14,7 +14,7 @@ message AckSpan {
}
message RequestSpan {
string traceId = 1;
TraceId traceId = 1;
string parentLevel = 2;
int32 levelId = 3;
string viewPointId = 4;
......@@ -29,8 +29,12 @@ message RequestSpan {
string address = 14;
}
message TraceId{
repeated int64 segments = 1;
}
message Span{
string traceId = 1;
TraceId traceId = 1;
string levelId = 2; // parentLevelId + "." + levelId
string viewpoint = 3;
int64 starttime = 4;
......
......@@ -5,14 +5,22 @@ option java_package = "com.a.eye.skywalking.network.grpc";
import "Spans.proto";
service TraceSearchService{
rpc search(TraceSearchCondition) returns (SearchResult){};
service AsyncTraceSearchService {
rpc search (stream QueryTask) returns (stream SearchResult) {
};
}
message TraceSearchCondition{
string traceid = 1;
service TraceSearchService {
rpc search (TraceId) returns (SearchResult) {
};
}
message QueryTask {
int32 taskId = 1;
TraceId traceId = 2;
}
message SearchResult{
repeated Span spans= 1;
message SearchResult {
string traceid = 1;
repeated Span spans = 2;
}
......@@ -8,7 +8,6 @@ public class Config {
public static int PORT = 34000;
}
public static class DataConsumer {
public static int CHANNEL_SIZE = 10;
......
......@@ -10,16 +10,22 @@ public class Constants {
public static class SQL {
public static final String CREATE_TABLE = "CREATE TABLE " + TABLE_NAME + "\n" + "(\n"
+ " id INT PRIMARY KEY NOT NULL IDENTITY,\n"
+ " trace_id VARCHAR(64) NOT NULL,\n"
+ " tid_s0 INT NOT NULL,\n"
+ " tid_s1 BIGINT NOT NULL,\n"
+ " tid_s2 INT NOT NULL,\n"
+ " tid_s3 INT NOT NULL,\n"
+ " tid_s4 INT NOT NULL,\n"
+ " tid_s5 INT NOT NULL,\n"
+ " span_type INT NOT NULL, \n"
+ " file_name VARCHAR(32) NOT NULL,\n"
+ " offset BIGINT NOT NULL,\n"
+ " length INT NOT NULL\n" + ");\n";
public static final String CREATE_INDEX = "CREATE INDEX \"index_data_trace_id_index\" ON " + TABLE_NAME + " (trace_id);";
public static final String CREATE_INDEX = "CREATE INDEX \"index_data_trace_id_index\" ON " + TABLE_NAME + " "
+ "(tid_s0,tid_s1,tid_s2,tid_s3,tid_s4,tid_s5);";
public static final String INSERT_INDEX = "INSERT INTO " +TABLE_NAME + "(trace_id,span_type"
+ ",file_name,offset,length) VALUES(?,?,?,?,?)";
public static final String INSERT_INDEX = "INSERT INTO " +TABLE_NAME + "(tid_s0,tid_s1,tid_s2,tid_s3,tid_s4,tid_s5,span_type"
+ ",file_name,offset,length) VALUES(?,?,?,?,?,?,?,?,?,?)";
public static final String QUERY_TABLES = "SELECT count(1) AS TABLE_COUNT FROM INFORMATION_SCHEMA.TABLES "
+ "WHERE TABLE_NAME= '" + TABLE_NAME.toUpperCase() + "';";
......@@ -27,7 +33,8 @@ public class Constants {
public static final String QUERY_INDEX_SIZE = "SELECT count(1) AS INDEX_SIZE FROM " + TABLE_NAME;
public static final String QUERY_TRACE_ID = "SELECT span_type, file_name, offset, length "
+ " FROM "+ TABLE_NAME+ " WHERE trace_id = ?";
+ " FROM "+ TABLE_NAME+ " WHERE tid_s0 = ? AND tid_s1 = ? AND tid_s2 = ? AND tid_s3=? AND tid_s4=? AND"
+ " tid_s5 = ?";
public static final String DEFAULT_USER = "root";
......
......@@ -35,7 +35,7 @@ public class SpanDataFinder {
IndexMetaCollection indexMetaCollection = null;
try {
indexDBConnector = fetchIndexDBConnector(blockIndex);
indexMetaCollection = indexDBConnector.queryByTraceId(traceId);
indexMetaCollection = indexDBConnector.queryByTraceId(spiltTraceId(traceId));
} finally {
if (indexDBConnector != null) {
indexDBConnector.close();
......@@ -70,6 +70,10 @@ public class SpanDataFinder {
return result;
}
private static long[] spiltTraceId(String traceId){
return null;
}
private static IndexDBConnector fetchIndexDBConnector(long blockIndex) {
HikariDataSource datasource = getOrCreate(blockIndex);
IndexDBConnector indexDBConnector;
......
package com.a.eye.skywalking.storage.data.exception;
/**
* Created by xin on 2016/11/15.
*/
public class IllegalTraceIdException extends RuntimeException {
public IllegalTraceIdException(String message) {
super(message);
}
}
......@@ -97,20 +97,39 @@ public class IndexDBConnector {
public void batchUpdate(IndexMetaGroup<Long> metaGroup) throws SQLException {
int currentIndex = 0;
PreparedStatement ps = connection.prepareStatement(INSERT_INDEX);
for (IndexMetaInfo metaInfo : metaGroup.getMetaInfo()) {
ps.setString(1, metaInfo.getTraceId());
ps.setInt(2, metaInfo.getSpanType().getValue());
ps.setString(3, metaInfo.getFileName());
ps.setLong(4, metaInfo.getOffset());
ps.setInt(5, metaInfo.getLength());
ps.addBatch();
if (++currentIndex > Constants.MAX_BATCH_SIZE) {
PreparedStatement ps = null;
try {
ps = connection.prepareStatement(INSERT_INDEX);
boolean isCommitted = false;
for (IndexMetaInfo metaInfo : metaGroup.getMetaInfo()) {
ps.setInt(1, metaInfo.getTraceId()[0].intValue());
ps.setLong(2, metaInfo.getTraceId()[1]);
ps.setInt(3, metaInfo.getTraceId()[2].intValue());
ps.setInt(4, metaInfo.getTraceId()[3].intValue());
ps.setInt(5, metaInfo.getTraceId()[4].intValue());
ps.setInt(6, metaInfo.getTraceId()[5].intValue());
ps.setInt(7, metaInfo.getSpanType().getValue());
ps.setString(8, metaInfo.getFileName());
ps.setLong(9, metaInfo.getOffset());
ps.setInt(10, metaInfo.getLength());
ps.addBatch();
if (++currentIndex > Constants.MAX_BATCH_SIZE) {
ps.executeBatch();
isCommitted = true;
} else {
isCommitted = false;
}
}
if (!isCommitted) {
ps.executeBatch();
}
} finally {
if (ps != null)
ps.close();
}
ps.executeBatch();
ps.close();
}
public long fetchIndexSize() throws SQLException {
......@@ -125,10 +144,15 @@ public class IndexDBConnector {
return indexSize;
}
public IndexMetaCollection queryByTraceId(String traceId) {
public IndexMetaCollection queryByTraceId(long[] traceId) {
try {
PreparedStatement ps = connection.prepareStatement(QUERY_TRACE_ID);
ps.setString(1, traceId);
ps.setInt(1, (int) traceId[0]);
ps.setLong(2, (int) traceId[1]);
ps.setInt(3, (int) traceId[2]);
ps.setInt(4, (int) traceId[3]);
ps.setInt(5, (int) traceId[4]);
ps.setInt(6, (int) traceId[5]);
ResultSet rs = ps.executeQuery();
IndexMetaCollection collection = new IndexMetaCollection();
......
......@@ -36,8 +36,8 @@ public class IndexMetaInfo {
return spanData.getTraceStartTime();
}
public String getTraceId() {
return spanData.getTraceId();
public Long[] getTraceId() {
return spanData.getTraceIdSegments();
}
public String getLevelId() {
......
package com.a.eye.skywalking.storage.data.spandata;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.storage.data.exception.IllegalTraceIdException;
/**
* Created by xin on 2016/11/12.
*/
......@@ -9,8 +12,10 @@ public abstract class AbstractSpanData implements SpanData{
return (parentLevelId == null || parentLevelId.length() == 0) ? levelId + "" : parentLevelId + "." + levelId;
}
protected static long buildTraceStartTime(String traceId) {
String[] traceIdSegment = traceId.split("\\.");
return Long.parseLong(traceIdSegment[traceIdSegment.length - 5]);
protected Long[] traceIdToArrays(TraceId traceId) {
if (traceId.getSegmentsCount() != 6){
throw new IllegalTraceIdException("The length of traceId must equals five.");
}
return traceId.getSegmentsList().toArray(new Long[traceId.getSegmentsCount()]);
}
}
package com.a.eye.skywalking.storage.data.spandata;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.TraceId;
/**
* Created by xin on 2016/11/12.
......@@ -22,7 +23,7 @@ public class AckSpanData extends AbstractSpanData {
@Override
public long getTraceStartTime() {
return buildTraceStartTime(ackSpan.getTraceId());
return ackSpan.getTraceId().getSegments(1);
}
@Override
......@@ -31,8 +32,8 @@ public class AckSpanData extends AbstractSpanData {
}
@Override
public String getTraceId() {
return ackSpan.getTraceId();
public Long[] getTraceIdSegments() {
return traceIdToArrays(ackSpan.getTraceId());
}
@Override
......
package com.a.eye.skywalking.storage.data.spandata;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.TraceId;
/**
* Created by xin on 2016/11/12.
......@@ -22,7 +23,7 @@ public class RequestSpanData extends AbstractSpanData {
@Override
public long getTraceStartTime() {
return buildTraceStartTime(requestSpan.getTraceId());
return requestSpan.getTraceId().getSegments(1);
}
@Override
......@@ -31,8 +32,8 @@ public class RequestSpanData extends AbstractSpanData {
}
@Override
public String getTraceId() {
return requestSpan.getTraceId();
public Long[] getTraceIdSegments() {
return traceIdToArrays(requestSpan.getTraceId());
}
@Override
......@@ -40,19 +41,19 @@ public class RequestSpanData extends AbstractSpanData {
return buildLevelId(requestSpan.getParentLevel(), requestSpan.getLevelId());
}
public String getAddress(){
public String getAddress() {
return requestSpan.getAddress();
}
public String getApplicationId(){
public String getApplicationId() {
return requestSpan.getApplicationId();
}
public String getProcessNo(){
public String getProcessNo() {
return requestSpan.getProcessNo();
}
public long getStartTime(){
public long getStartTime() {
return requestSpan.getStartDate();
}
......
......@@ -8,7 +8,7 @@ public interface SpanData {
byte[] toByteArray();
String getTraceId();
Long[] getTraceIdSegments();
String getLevelId();
}
package com.a.eye.skywalking.storage.data.spandata;
import com.a.eye.skywalking.network.grpc.Span;
import com.a.eye.skywalking.network.grpc.TraceId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* Created by xin on 2016/11/12.
......@@ -54,7 +52,8 @@ public class SpanDataHelper {
builder = builder.setLevelId(requestSpanData.getLevelId()).setProcessNo(requestSpanData.getProcessNo())
.setSpanType(requestSpanData.getType()).setStarttime(requestSpanData.getStartTime())
.setStatusCode(ackSpanData.getStatusCode()).setTraceId(requestSpanData.getTraceId());
.setStatusCode(ackSpanData.getStatusCode())
.setTraceId(TraceId.newBuilder().addAllSegments(Arrays.asList(requestSpanData.getTraceIdSegments())));
return builder.build();
}
......
......@@ -20,7 +20,7 @@ server.port=34000
#datafile.path=/data/file
#
# the max size of data file (byte)
#datafile.size=3221225472
datafile.size=50000000
#
# the path that storage data index
#dataindex.path=/data/index
......@@ -29,7 +29,7 @@ server.port=34000
#dataindex.file_name=dataIndex
#
# the max size of data index
#dataindex.size=100000000
dataindex.size=1000000
#
# the cached size of data index operator
#dataindex.operator.cache_size=10
......
......@@ -3,16 +3,13 @@ import com.a.eye.skywalking.network.dependencies.io.grpc.ManagedChannelBuilder;
import com.a.eye.skywalking.network.dependencies.io.grpc.stub.ClientCallStreamObserver;
import com.a.eye.skywalking.network.dependencies.io.grpc.stub.ServerCallStreamObserver;
import com.a.eye.skywalking.network.dependencies.io.grpc.stub.StreamObserver;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.SendResult;
import com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc;
import com.a.eye.skywalking.network.grpc.*;
import static com.a.eye.skywalking.network.grpc.SpanStorageServiceGrpc.newStub;
public class StorageClient {
private static ManagedChannel channel =
ManagedChannelBuilder.forAddress("10.128.35.79", 34000).usePlaintext(true).build();
ManagedChannelBuilder.forAddress("127.0.0.1", 34000).usePlaintext(true).build();
private static SpanStorageServiceGrpc.SpanStorageServiceStub spanStorageServiceStub = newStub(channel);
......@@ -24,17 +21,20 @@ public class StorageClient {
public static void main(String[] args) throws InterruptedException {
RequestSpan requestSpan =
RequestSpan.newBuilder().setSpanType(1).setAddress("127.0.0.1").setApplicationId("1").setCallType("1")
.setLevelId(0).setProcessNo("19287").setStartDate(System.currentTimeMillis())
.setTraceId("1.0Final.1478661327960.8504828.2277.53.3").setUserId("1")
.setLevelId(0).setProcessNo("19287").setStartDate(System.currentTimeMillis()).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setUserId("1")
.setViewPointId("http://localhost:8080/wwww/test/helloWorld").build();
AckSpan ackSpan =
AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId("1.0Final.1478661327960.8504828.2277.53.3")
.setStatusCode(0).setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(1478661327960L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).setStatusCode(0)
.setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
long startTime = System.currentTimeMillis();
for(int i = 0; i < 1000; i++){
for (int i = 0; i < 1; i++) {
StreamObserver<AckSpan> ackSpanStreamObserver =
spanStorageServiceStub.storageACKSpan(new StreamObserver<SendResult>() {
@Override
......@@ -69,12 +69,13 @@ public class StorageClient {
endTime2 = System.currentTimeMillis();
}
});
for(int j = 0; j < 10000; j++){
for (int j = 0; j < 1; j++) {
requestSpanStreamObserver.onNext(requestSpan);
ackSpanStreamObserver.onNext(ackSpan);
ClientCallStreamObserver<RequestSpan> newRequestSpanStreamObserver = (ClientCallStreamObserver<RequestSpan>)requestSpanStreamObserver;
while(!newRequestSpanStreamObserver.isReady()){
ClientCallStreamObserver<RequestSpan> newRequestSpanStreamObserver =
(ClientCallStreamObserver<RequestSpan>) requestSpanStreamObserver;
while (!newRequestSpanStreamObserver.isReady()) {
Thread.sleep(1);
}
}
......
......@@ -35,7 +35,7 @@ public class DataFileWriterTest {
public void testConvertFile() throws Exception {
List<SpanData> spanData = new ArrayList<>();
spanData.add(new RequestSpanData(
RequestSpan.newBuilder().setTraceId("test-traceId").setStartDate(System.currentTimeMillis())
RequestSpan.newBuilder()/*.setTraceId("test-traceId")*/.setStartDate(System.currentTimeMillis())
.setProcessNo("7777").setLevelId(10).setParentLevel("0.0.0").setAddress("127.0.0.1").build()));
writer.write(spanData);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册