提交 25970a41 编写于 作者: A ascrutae

Merge remote-tracking branch 'origin/master'

...@@ -24,18 +24,15 @@ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTra ...@@ -24,18 +24,15 @@ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTra
public StreamObserver<QueryTask> search(final StreamObserver<SearchResult> responseObserver) { public StreamObserver<QueryTask> search(final StreamObserver<SearchResult> responseObserver) {
return new StreamObserver<QueryTask>() { return new StreamObserver<QueryTask>() {
private List<Span> spans; private List<Span> spans;
private int taskId;
@Override @Override
public void onNext(QueryTask value) { public void onNext(QueryTask value) {
taskId = value.getTaskId();
spans = searchListener.search(value.getTraceId()); spans = searchListener.search(value.getTraceId());
} }
@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
SearchResult.Builder builder = SearchResult.newBuilder(); SearchResult.Builder builder = SearchResult.newBuilder();
builder = builder.setTaskId(taskId);
responseObserver.onNext(builder.build()); responseObserver.onNext(builder.build());
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
...@@ -46,7 +43,6 @@ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTra ...@@ -46,7 +43,6 @@ public class AsyncTraceSearchServer extends AsyncTraceSearchServiceGrpc.AsyncTra
if(spans != null) { if(spans != null) {
builder = builder.addAllSpans(spans); builder = builder.addAllSpans(spans);
} }
builder = builder.setTaskId(taskId);
responseObserver.onNext(builder.build()); responseObserver.onNext(builder.build());
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
......
...@@ -16,11 +16,9 @@ service TraceSearchService { ...@@ -16,11 +16,9 @@ service TraceSearchService {
} }
message QueryTask { message QueryTask {
int32 taskId = 1;
TraceId traceId = 2; TraceId traceId = 2;
} }
message SearchResult { message SearchResult {
int32 taskId = 1;
repeated Span spans = 2; repeated Span spans = 2;
} }
...@@ -8,6 +8,7 @@ import com.a.eye.skywalking.network.Server; ...@@ -8,6 +8,7 @@ import com.a.eye.skywalking.network.Server;
import com.a.eye.skywalking.routing.config.Config; import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.config.ConfigInitializer; import com.a.eye.skywalking.routing.config.ConfigInitializer;
import com.a.eye.skywalking.routing.listener.SpanStorageListenerImpl; import com.a.eye.skywalking.routing.listener.SpanStorageListenerImpl;
import com.a.eye.skywalking.routing.listener.TraceSearchListenerImpl;
import com.a.eye.skywalking.routing.router.RoutingService; import com.a.eye.skywalking.routing.router.RoutingService;
import com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl; import com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl;
...@@ -25,12 +26,12 @@ public class Main { ...@@ -25,12 +26,12 @@ public class Main {
LogManager.setLogResolver(new Log4j2Resolver()); LogManager.setLogResolver(new Log4j2Resolver());
new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, RoutingService.getRouter()); new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, RoutingService.getRouter());
Server.newBuilder(Config.Routing.PORT).addSpanStorageService(new SpanStorageListenerImpl()).build().start(); Server.newBuilder(Config.Routing.PORT).addSpanStorageService(new SpanStorageListenerImpl()).addTraceSearchService(new TraceSearchListenerImpl()).build().start();
logger.info("Skywalking routing service was started."); logger.info("Skywalking routing service was started.");
Thread.currentThread().join(); Thread.currentThread().join();
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to start routing service.", e); logger.error("Failed to start routing service.", e);
}finally { } finally {
RoutingService.stop(); RoutingService.stop();
} }
} }
......
package com.a.eye.skywalking.routing.client;
import com.a.eye.skywalking.network.Client;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.network.grpc.client.TraceSearchClient;
import com.a.eye.skywalking.network.listener.client.StorageClientListener;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by wusheng on 2016/12/3.
*/
public class StorageClientCachePool {
public static StorageClientCachePool INSTANCE = new StorageClientCachePool();
private Map<String, Client> pool = new ConcurrentHashMap<>();
private Map<String, SpanStorageClient> spanStorageClientPool = new ConcurrentHashMap<>();
private Map<String, TraceSearchClient> traceSearchClientPool = new ConcurrentHashMap<>();
private ReentrantLock lock = new ReentrantLock();
private StorageClientCachePool() {
}
public Collection<TraceSearchClient> getClients() {
return traceSearchClientPool.values();
}
public SpanStorageClient getSpanStorageClient(String connectionURL, StorageClientListener listener) {
SpanStorageClient spanStorageClient = spanStorageClientPool.get(connectionURL);
if (spanStorageClient != null) {
return spanStorageClient;
}
lock.lock();
try {
spanStorageClient = spanStorageClientPool.get(connectionURL);
if (spanStorageClient == null) {
String[] urlSegment = connectionURL.split(":");
if (urlSegment.length != 2) {
throw new IllegalArgumentException();
}
Client client = new Client(urlSegment[0], Integer.valueOf(urlSegment[1]));
pool.put(connectionURL, client);
spanStorageClientPool.put(connectionURL, client.newSpanStorageClient(listener));
traceSearchClientPool.put(connectionURL, client.newTraceSearchClient());
}
return spanStorageClient;
} finally {
lock.unlock();
}
}
public void shutdown(String connectionURL) {
lock.lock();
try {
if (pool.containsKey(connectionURL)) {
Client client = pool.remove(connectionURL);
client.shutdown();
spanStorageClientPool.remove(connectionURL);
traceSearchClientPool.remove(connectionURL);
}
} finally {
lock.unlock();
}
}
}
...@@ -5,17 +5,26 @@ public class Config { ...@@ -5,17 +5,26 @@ public class Config {
public static int PORT = 23000; public static int PORT = 23000;
} }
public static class Search {
public static long CHECK_CYCLE = 100L;
public static long TIMEOUT = 3 * 1000L;
}
public static class RegistryCenter { public static class RegistryCenter {
public static String TYPE = "zookeeper"; public static String TYPE = "zookeeper";
public static String CONNECT_URL = "127.0.0.1:2181"; public static String CONNECT_URL = "127.0.0.1:2181";
public static String AUTH_SCHEMA = ""; public static String AUTH_SCHEMA = "";
public static String AUTH_INFO = ""; public static String AUTH_INFO = "";
} }
public static class StorageNode { public static class StorageNode {
public static String SUBSCRIBE_PATH = "/skywalking/storage_list"; public static String SUBSCRIBE_PATH = "/skywalking/storage_list";
} }
public static class Disruptor { public static class Disruptor {
public static int BUFFER_SIZE = 1024 * 128 * 4; public static int BUFFER_SIZE = 1024 * 128 * 4;
......
...@@ -6,6 +6,7 @@ import com.a.eye.skywalking.network.Client; ...@@ -6,6 +6,7 @@ import com.a.eye.skywalking.network.Client;
import com.a.eye.skywalking.network.grpc.RequestSpan; import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient; import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.network.listener.client.StorageClientListener; import com.a.eye.skywalking.network.listener.client.StorageClientListener;
import com.a.eye.skywalking.routing.client.StorageClientCachePool;
import com.a.eye.skywalking.routing.config.Config; import com.a.eye.skywalking.routing.config.Config;
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventHandler;
...@@ -16,22 +17,15 @@ import java.util.List; ...@@ -16,22 +17,15 @@ import java.util.List;
* Created by xin on 2016/11/29. * Created by xin on 2016/11/29.
*/ */
public abstract class AbstractRouteSpanEventHandler<T> implements EventHandler<T> { public abstract class AbstractRouteSpanEventHandler<T> implements EventHandler<T> {
protected Client client; protected SpanStorageClient client;
protected final int bufferSize; protected final int bufferSize;
protected boolean stop; protected boolean stop;
private volatile boolean previousSendFinish = true; private volatile boolean previousSendFinish = true;
private StorageClientListener listener;
public AbstractRouteSpanEventHandler(String connectionURL) { public AbstractRouteSpanEventHandler(String connectionURL) {
bufferSize = Config.Disruptor.FLUSH_SIZE; bufferSize = Config.Disruptor.FLUSH_SIZE;
String[] urlSegment = connectionURL.split(":"); listener = new StorageClientListener() {
if (urlSegment.length != 2) {
throw new IllegalArgumentException();
}
client = new Client(urlSegment[0], Integer.valueOf(urlSegment[1]));
}
protected SpanStorageClient getStorageClient() {
SpanStorageClient spanStorageClient = client.newSpanStorageClient(new StorageClientListener() {
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
previousSendFinish = true; previousSendFinish = true;
...@@ -43,9 +37,13 @@ public abstract class AbstractRouteSpanEventHandler<T> implements EventHandler<T ...@@ -43,9 +37,13 @@ public abstract class AbstractRouteSpanEventHandler<T> implements EventHandler<T
previousSendFinish = true; previousSendFinish = true;
HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.INFO, " consumed Successfully"); HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.INFO, " consumed Successfully");
} }
}); };
client = StorageClientCachePool.INSTANCE.getSpanStorageClient(connectionURL, listener);
}
protected SpanStorageClient getStorageClient() {
previousSendFinish = false; previousSendFinish = false;
return spanStorageClient; return client;
} }
public abstract String getExtraId(); public abstract String getExtraId();
......
...@@ -28,11 +28,6 @@ public class SpanDisruptor { ...@@ -28,11 +28,6 @@ public class SpanDisruptor {
return ackSpanDisruptor.saveAckSpan(ackSpan); return ackSpanDisruptor.saveAckSpan(ackSpan);
} }
public void stop() {
requestSpanDisruptor.shutDown();
ackSpanDisruptor.shutdown();
}
public String getConnectionURL() { public String getConnectionURL() {
return connectionURL; return connectionURL;
} }
......
package com.a.eye.skywalking.routing.listener;
import com.a.eye.skywalking.network.grpc.QueryTask;
import com.a.eye.skywalking.network.grpc.SearchResult;
import com.a.eye.skywalking.network.grpc.Span;
import com.a.eye.skywalking.network.grpc.TraceId;
import com.a.eye.skywalking.network.grpc.client.TraceSearchClient;
import com.a.eye.skywalking.network.listener.client.SearchClientListener;
import com.a.eye.skywalking.network.listener.server.TraceSearchListener;
import com.a.eye.skywalking.routing.client.StorageClientCachePool;
import com.a.eye.skywalking.routing.config.Config;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
* Created by wusheng on 2016/12/3.
*/
public class TraceSearchListenerImpl implements TraceSearchListener {
@Override
public List<Span> search(TraceId traceId) {
Collection<TraceSearchClient> clients = StorageClientCachePool.INSTANCE.getClients();
CountDownLatch countDownLatch = new CountDownLatch(clients.size());
ConcurrentHashMap<SearchResult, Boolean> results = new ConcurrentHashMap<>();
for (TraceSearchClient client : clients) {
client.search(QueryTask.newBuilder().setTraceId(traceId).build(), new SearchClientListener(){
@Override
public void onError(Throwable throwable) {
}
@Override
public void onReturn(SearchResult searchResult) {
results.put(searchResult, true);
countDownLatch.countDown();
}
@Override
public void onFinished() {
}
});
}
long waitTime = 0;
while(countDownLatch.getCount() != 0){
try {
Thread.sleep(Config.Search.CHECK_CYCLE);
} catch (InterruptedException e) {
}
waitTime += Config.Search.CHECK_CYCLE;
if(waitTime > Config.Search.TIMEOUT){
break;
}
}
List<Span> spans = new ArrayList<>(30);
for (SearchResult searchResult : results.keySet()) {
List<Span> result = searchResult.getSpansList();
if(result != null && result.size() > 0) {
spans.addAll(result);
}
}
return spans;
}
}
...@@ -5,6 +5,7 @@ import com.a.eye.skywalking.logging.api.LogManager; ...@@ -5,6 +5,7 @@ import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan; import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan; import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.registry.api.RegistryNode; import com.a.eye.skywalking.registry.api.RegistryNode;
import com.a.eye.skywalking.routing.client.StorageClientCachePool;
import com.a.eye.skywalking.routing.disruptor.NoopSpanDisruptor; import com.a.eye.skywalking.routing.disruptor.NoopSpanDisruptor;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor; import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.storage.listener.NodeChangesListener; import com.a.eye.skywalking.routing.storage.listener.NodeChangesListener;
...@@ -52,19 +53,16 @@ public class Router implements NodeChangesListener { ...@@ -52,19 +53,16 @@ public class Router implements NodeChangesListener {
} }
} }
Collections.sort(newDisruptors, new Comparator<SpanDisruptor>() { Collections.sort(newDisruptors, (o1, o2) -> {
@Override long o1Key = Long.parseLong(o1.getConnectionURL().replace(".", "").replace(":", ""));
public int compare(SpanDisruptor o1, SpanDisruptor o2) { long o2Key = Long.parseLong(o2.getConnectionURL().replace(".", "").replace(":", ""));
long o1Key = Long.parseLong(o1.getConnectionURL().replace(".", "").replace(":", ""));
long o2Key = Long.parseLong(o2.getConnectionURL().replace(".", "").replace(":", "")); if (o1Key == o2Key) {
return 0;
if (o1Key == o2Key) { } else if (o1Key > o2Key) {
return 0; return 1;
} else if (o1Key > o2Key) { } else {
return 1; return -1;
} else {
return -1;
}
} }
}); });
...@@ -74,6 +72,7 @@ public class Router implements NodeChangesListener { ...@@ -74,6 +72,7 @@ public class Router implements NodeChangesListener {
// 而后stop // 而后stop
for (SpanDisruptor removedDisruptor : removedDisruptors) { for (SpanDisruptor removedDisruptor : removedDisruptors) {
removedDisruptor.shutdown(); removedDisruptor.shutdown();
StorageClientCachePool.INSTANCE.shutdown(removedDisruptor.getConnectionURL());
} }
} }
......
...@@ -18,7 +18,6 @@ import static com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl.C ...@@ -18,7 +18,6 @@ import static com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl.C
public class NotifyListenerImpl implements NotifyListener { public class NotifyListenerImpl implements NotifyListener {
private NodeChangesListener listener; private NodeChangesListener listener;
private List<String> childrenConnectionURLOfPreviousChanged = new ArrayList<>();
private ReentrantLock lock = new ReentrantLock(); private ReentrantLock lock = new ReentrantLock();
public NotifyListenerImpl(String subscribePath, NodeChangesListener listener) { public NotifyListenerImpl(String subscribePath, NodeChangesListener listener) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册