提交 192e8a29 编写于 作者: A ascrutae

提交Routing逻辑,修复注册中心bug

上级 5b65fa5a
......@@ -34,12 +34,14 @@ public class ZookeeperRegistryCenter implements RegistryCenter {
@Override
public void subscribe(String path, final NotifyListener listener) {
client.subscribeChildChanges(path, new IZkChildListener() {
List<String> children = client.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> children) throws Exception {
listener.notify(children);
}
});
if (children != null && children.size() > 0)
listener.notify(children);
}
private boolean exists(String path) {
......
......@@ -58,6 +58,22 @@ public class ZookeeperRegistryCenterTest extends TestSuite {
assertEquals(addUrl.deleteCharAt(addUrl.length() - 1).toString(), "127.0.0.1:9400");
}
@Test
public void subscribeNodeAfterNodeRegistryTest() throws InterruptedException {
registryCenter.register("/skywalking/storage/127.0.0.1:9400");
final StringBuilder addUrl = new StringBuilder();
registryCenter.subscribe("/skywalking/storage", new NotifyListener() {
@Override
public void notify(List<String> currentUrls) {
for (String url : currentUrls) {
addUrl.append(url + ",");
}
}
});
Thread.sleep(100L);
assertEquals(addUrl.deleteCharAt(addUrl.length() - 1).toString(), "127.0.0.1:9400");
}
@Test
public void registryNodeTest() throws IOException, InterruptedException, KeeperException {
registryCenter.register("/skywalking/storage/test");
......
......@@ -13,6 +13,8 @@ message AckSpan {
string viewpointId = 7;
string userId = 8;
string applicationId = 9;
int32 routeKey = 10;
}
message RequestSpan {
......@@ -29,6 +31,8 @@ message RequestSpan {
string businessKey = 11;
int32 processNo = 13;
string address = 14;
int32 routeKey = 15;
}
message TraceId{
......
......@@ -18,5 +18,17 @@
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>1.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>3.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
package com.a.eye.skywalking.routing;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.logging.api.LogResolver;
import com.a.eye.skywalking.logging.impl.log4j2.Log4j2Resolver;
import com.a.eye.skywalking.network.Server;
import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.config.ConfigInitializer;
import com.a.eye.skywalking.routing.listener.SpanStorageListenerImpl;
import com.a.eye.skywalking.routing.router.RoutingService;
import com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
public class Main {
private static final ILog logger = LogManager.getLogger(Main.class);
public static void main(String[] args) {
try {
initConfig();
LogManager.setLogResolver(new Log4j2Resolver());
new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, RoutingService.getRouter());
Server.newBuilder(Config.Routing.PORT).addSpanStorageService(new SpanStorageListenerImpl()).build().start();
logger.info("Skywalking routing service was started.");
Thread.currentThread().join();
} catch (Exception e) {
logger.error("Failed to start routing service.", e);
}finally {
RoutingService.stop();
}
}
private static void initConfig() throws IllegalAccessException, IOException {
Properties properties = new Properties();
try {
properties.load(Main.class.getResourceAsStream("/config.properties"));
printStorageConfig(properties);
ConfigInitializer.initialize(properties, Config.class);
} catch (IllegalAccessException e) {
logger.error("Initialize server configuration failure.", e);
throw e;
} catch (IOException e) {
logger.error("Initialize server configuration failure.", e);
throw e;
}
}
private static void printStorageConfig(Properties config) {
for (Map.Entry<Object, Object> entry : config.entrySet()) {
logger.info("{} = {}", entry.getKey(), entry.getValue());
}
}
}
package com.a.eye.skywalking.routing.config;
public class Config {
public static class Routing {
public static int PORT = 23000;
}
public static class RegistryCenter {
public static String TYPE = "zookeeper";
public static String CONNECT_URL = "127.0.0.1:2181";
public static String AUTH_SCHEMA = "";
public static String AUTH_INFO = "";
}
public static class StorageNode {
public static String SUBSCRIBE_PATH = "/skywalking/storage_list";
}
public static class Disruptor {
public static int BUFFER_SIZE = 2 ^ 10;
}
}
package com.a.eye.skywalking.routing.config;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.LinkedList;
import java.util.Properties;
import java.util.logging.Logger;
public class ConfigInitializer {
private static Logger logger = Logger.getLogger(ConfigInitializer.class.getName());
public static void initialize(Properties properties, Class<?> rootConfigType) throws IllegalAccessException {
initNextLevel(properties, rootConfigType, new ConfigDesc());
}
private static void initNextLevel(Properties properties, Class<?> recentConfigType, ConfigDesc parentDesc) throws NumberFormatException, IllegalArgumentException, IllegalAccessException {
for (Field field : recentConfigType.getFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers())) {
String configKey = (parentDesc + "." +
field.getName()).toLowerCase();
String value = properties.getProperty(configKey);
if (value != null) {
if (field.getType().equals(int.class))
field.set(null, Integer.valueOf(value));
if (field.getType().equals(String.class))
field.set(null, value);
if (field.getType().equals(long.class))
field.set(null, Long.valueOf(value));
if (field.getType().equals(boolean.class))
field.set(null, Boolean.valueOf(value));
}
}
}
for (Class<?> innerConfiguration : recentConfigType.getClasses()) {
parentDesc.append(innerConfiguration.getSimpleName());
initNextLevel(properties, innerConfiguration, parentDesc);
parentDesc.removeLastDesc();
}
}
}
class ConfigDesc {
private LinkedList<String> descs = new LinkedList<String>();
void append(String currentDesc) {
descs.addLast(currentDesc);
}
void removeLastDesc() {
descs.removeLast();
}
@Override
public String toString() {
if (descs.size() == 0) {
return "";
}
StringBuilder ret = new StringBuilder(descs.getFirst());
boolean first = true;
for (String desc : descs) {
if (first) {
first = false;
continue;
}
ret.append(".").append(desc);
}
return ret.toString();
}
}
package com.a.eye.skywalking.routing.disruptor;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.network.Client;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.network.listener.client.StorageClientListener;
import com.lmax.disruptor.EventHandler;
/**
* Created by xin on 2016/11/29.
*/
public abstract class AbstractSpanEventHandler<T> implements EventHandler<T> {
protected Client client;
protected int bufferSize = 100;
protected boolean stop;
protected volatile boolean previousSendResult = true;
public AbstractSpanEventHandler(String connectionURL) {
String[] urlSegment = connectionURL.split(":");
if (urlSegment.length != 2) {
throw new IllegalArgumentException();
}
client = new Client(urlSegment[0], Integer.valueOf(urlSegment[1]));
}
protected SpanStorageClient getStorageClient() {
SpanStorageClient spanStorageClient = client.newSpanStorageClient(new StorageClientListener() {
@Override
public void onError(Throwable throwable) {
HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.ERROR,
"Failed to send span. error message :" + throwable.getMessage());
}
@Override
public void onBatchFinished() {
previousSendResult = true;
HealthCollector.getCurrentHeathReading(getExtraId()).updateData(HeathReading.INFO,
" consumed Successfully");
}
});
previousSendResult = false;
return spanStorageClient;
}
public abstract String getExtraId();
public void stop() {
stop = true;
}
}
package com.a.eye.skywalking.routing.disruptor;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
/**
* Created by xin on 2016/11/29.
*/
public class NoopSpanDisruptor extends SpanDisruptor {
@Override
public boolean saveSpan(AckSpan ackSpan) {
return false;
}
@Override
public boolean saveSpan(RequestSpan requestSpan) {
return false;
}
}
package com.a.eye.skywalking.routing.disruptor;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.routing.disruptor.ack.AckSpanDisruptor;
import com.a.eye.skywalking.routing.disruptor.request.RequestSpanDisruptor;
public class SpanDisruptor {
private AckSpanDisruptor ackSpanDisruptor;
private RequestSpanDisruptor requestSpanDisruptor;
private String connectionURL;
SpanDisruptor() {
}
public SpanDisruptor(String connectionURL) {
requestSpanDisruptor = new RequestSpanDisruptor(connectionURL);
ackSpanDisruptor = new AckSpanDisruptor(connectionURL);
this.connectionURL = connectionURL;
}
public boolean saveSpan(RequestSpan requestSpan) {
return requestSpanDisruptor.saveRequestSpan(requestSpan);
}
public boolean saveSpan(AckSpan ackSpan) {
return ackSpanDisruptor.saveAckSpan(ackSpan);
}
public void stop() {
requestSpanDisruptor.shutDown();
ackSpanDisruptor.shutdown();
}
public String getConnectionURL() {
return connectionURL;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SpanDisruptor that = (SpanDisruptor) o;
return connectionURL != null ? connectionURL.equals(that.connectionURL) : that.connectionURL == null;
}
@Override
public int hashCode() {
return connectionURL != null ? connectionURL.hashCode() : 0;
}
}
package com.a.eye.skywalking.routing.disruptor.ack;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.routing.router.RoutingService;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanEventHandler;
import java.util.ArrayList;
import java.util.List;
public class AckSpanBufferEventHandler extends AbstractSpanEventHandler<AckSpanHolder> {
private List<AckSpan> buffer = new ArrayList<>(bufferSize);
public AckSpanBufferEventHandler(String connectionURl) {
super(connectionURl);
}
@Override
public String getExtraId() {
return "AckSpanEventHandler";
}
@Override
public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event.getAckSpan());
if (stop){
try {
for (AckSpan ackSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
spanDisruptor.saveSpan(ackSpan);
}
}finally {
buffer.clear();
}
return ;
}
while (!previousSendResult){
try {
Thread.sleep(10L);
}catch (InterruptedException e){
}
}
if (endOfBatch || buffer.size() == bufferSize) {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendACKSpan(buffer);
buffer.clear();
}
}
}
package com.a.eye.skywalking.routing.disruptor.ack;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.routing.config.Config;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
/**
* Created by xin on 2016/11/29.
*/
public class AckSpanDisruptor {
private static ILog logger = LogManager.getLogger(AckSpanDisruptor.class);
private Disruptor<AckSpanHolder> ackSpanDisruptor;
private RingBuffer<AckSpanHolder> ackSpanRingBuffer;
private AckSpanBufferEventHandler ackSpanEventHandler;
public AckSpanDisruptor(String connectionURL) {
ackSpanDisruptor = new Disruptor<AckSpanHolder>(new AckSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
ackSpanEventHandler = new AckSpanBufferEventHandler(connectionURL);
ackSpanDisruptor.handleEventsWith(ackSpanEventHandler);
ackSpanDisruptor.start();
ackSpanRingBuffer = ackSpanDisruptor.getRingBuffer();
}
public boolean saveAckSpan(AckSpan ackSpan) {
long sequence = ackSpanRingBuffer.next();
try {
AckSpanHolder data = ackSpanRingBuffer.get(sequence);
data.setAckSpan(ackSpan);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO, "AckSpan stored.");
return true;
} catch (Exception e) {
logger.error("AckSpan trace-id[{}] store failure..", ackSpan.getTraceId(), e);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR, "AckSpan store failure.");
return false;
} finally {
ackSpanRingBuffer.publish(sequence);
}
}
public void shutdown() {
ackSpanEventHandler.stop();
ackSpanDisruptor.shutdown();
}
}
package com.a.eye.skywalking.routing.disruptor.ack;
import com.lmax.disruptor.EventFactory;
/**
* Created by xin on 2016/11/27.
*/
public class AckSpanFactory implements EventFactory<AckSpanHolder> {
@Override
public AckSpanHolder newInstance() {
return new AckSpanHolder();
}
}
package com.a.eye.skywalking.routing.disruptor.ack;
import com.a.eye.skywalking.network.grpc.AckSpan;
/**
* Created by xin on 2016/11/27.
*/
public class AckSpanHolder {
private AckSpan ackSpan;
public void setAckSpan(AckSpan ackSpan) {
this.ackSpan = ackSpan;
}
public AckSpan getAckSpan() {
return ackSpan;
}
}
package com.a.eye.skywalking.routing.disruptor.request;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.routing.config.Config;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
/**
* Created by xin on 2016/11/29.
*/
public class RequestSpanDisruptor {
private static ILog logger = LogManager.getLogger(RequestSpanDisruptor.class);
private Disruptor<RequestSpanHolder> requestSpanDisruptor;
private RingBuffer<RequestSpanHolder> requestSpanRingBuffer;
private SendRequestSpanEventHandler eventHandler;
public RequestSpanDisruptor(String connectionURL) {
requestSpanDisruptor = new Disruptor<RequestSpanHolder>(new RequestSpanFactory(), Config.Disruptor.BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
eventHandler = new SendRequestSpanEventHandler(connectionURL);
requestSpanDisruptor.handleEventsWith(eventHandler);
requestSpanDisruptor.start();
requestSpanRingBuffer = requestSpanDisruptor.getRingBuffer();
}
public boolean saveRequestSpan(RequestSpan requestSpan) {
long sequence = requestSpanRingBuffer.next();
try {
RequestSpanHolder data = requestSpanRingBuffer.get(sequence);
data.setRequestSpan(requestSpan);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.INFO, "RequestSpan stored.");
return true;
} catch (Exception e) {
logger.error("RequestSpan trace-id[{}] store failure..", requestSpan.getTraceId(), e);
HealthCollector.getCurrentHeathReading("StorageListener").updateData(HeathReading.ERROR, "RequestSpan store failure.");
return false;
} finally {
requestSpanRingBuffer.publish(sequence);
}
}
public void shutDown() {
eventHandler.stop();
requestSpanDisruptor.shutdown();
}
}
package com.a.eye.skywalking.routing.disruptor.request;
import com.lmax.disruptor.EventFactory;
/**
* Created by xin on 2016/11/27.
*/
public class RequestSpanFactory implements EventFactory<RequestSpanHolder> {
@Override
public RequestSpanHolder newInstance() {
return new RequestSpanHolder();
}
}
package com.a.eye.skywalking.routing.disruptor.request;
import com.a.eye.skywalking.network.grpc.RequestSpan;
/**
* Created by xin on 2016/11/27.
*/
public class RequestSpanHolder {
private RequestSpan requestSpan;
public void setRequestSpan(RequestSpan requestSpan) {
this.requestSpan = requestSpan;
}
public RequestSpan getRequestSpan() {
return requestSpan;
}
}
package com.a.eye.skywalking.routing.disruptor.request;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.grpc.client.SpanStorageClient;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.router.RoutingService;
import com.a.eye.skywalking.routing.disruptor.AbstractSpanEventHandler;
import java.util.ArrayList;
import java.util.List;
/**
* Created by xin on 2016/11/27.
*/
public class SendRequestSpanEventHandler extends AbstractSpanEventHandler<RequestSpanHolder> {
private List<RequestSpan> buffer = new ArrayList<>(bufferSize);
public SendRequestSpanEventHandler(String connectionURl) {
super(connectionURl);
}
@Override
public String getExtraId() {
return "RequestSpanEventHandler";
}
@Override
public void onEvent(RequestSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event.getRequestSpan());
if (stop) {
try {
for (RequestSpan ackSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
spanDisruptor.saveSpan(ackSpan);
}
} finally {
buffer.clear();
}
return;
}
while (!previousSendResult) {
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
}
}
if (endOfBatch || buffer.size() == bufferSize) {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendRequestSpan(buffer);
buffer.clear();
}
}
}
package com.a.eye.skywalking.routing.listener;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.network.listener.server.SpanStorageServerListener;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.router.RoutingService;
/**
* Created by xin on 2016/11/27.
*/
public class SpanStorageListenerImpl implements SpanStorageServerListener {
@Override
public boolean storage(RequestSpan requestSpan) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(requestSpan);
return spanDisruptor.saveSpan(requestSpan);
}
@Override
public boolean storage(AckSpan ackSpan) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
return spanDisruptor.saveSpan(ackSpan);
}
}
package com.a.eye.skywalking.routing.router;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.routing.disruptor.NoopSpanDisruptor;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.storage.listener.NodeChangesListener;
import com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class Router implements NodeChangesListener {
private static ILog logger = LogManager.getLogger(Router.class);
private SpanDisruptor[] disruptors = new SpanDisruptor[0];
private NoopSpanDisruptor noopSpanPool = new NoopSpanDisruptor();
public SpanDisruptor lookup(RequestSpan requestSpan) {
return getSpanDisruptor(requestSpan.getRouteKey());
}
public SpanDisruptor lookup(AckSpan ackSpan) {
return getSpanDisruptor(ackSpan.getRouteKey());
}
private SpanDisruptor getSpanDisruptor(int routKey) {
if (disruptors.length == 0) {
return noopSpanPool;
}
while(true){
int index = routKey % disruptors.length;
try {
return disruptors[index];
}catch (ArrayIndexOutOfBoundsException e){
}
}
}
@Override
public void notify(List<String> connectionURL, NotifyListenerImpl.ChangeType changeType) {
List<SpanDisruptor> newDisruptors = null;
if (changeType == NotifyListenerImpl.ChangeType.Add) {
newDisruptors = new ArrayList<>(this.disruptors.length + connectionURL.size());
for (String url : connectionURL) {
newDisruptors.add(new SpanDisruptor(url));
}
}
if (changeType == NotifyListenerImpl.ChangeType.Removed) {
newDisruptors = new ArrayList<SpanDisruptor>(disruptors.length - connectionURL.size());
for (SpanDisruptor disruptor : disruptors) {
if (!connectionURL.contains(disruptor.getConnectionURL())) {
newDisruptors.add(disruptor);
}
}
}
Collections.sort(newDisruptors, new Comparator<SpanDisruptor>() {
@Override
public int compare(SpanDisruptor o1, SpanDisruptor o2) {
long o1Key = Long.parseLong(o1.getConnectionURL().replace(".", "").replace(":", ""));
long o2Key = Long.parseLong(o2.getConnectionURL().replace(".", "").replace(":", ""));
if (o1Key == o2Key) {
return 0;
} else if (o1Key > o2Key) {
return 1;
} else {
return -1;
}
}
});
disruptors = newDisruptors.toArray(new SpanDisruptor[newDisruptors.size()]);
}
public void stop() {
logger.info("Stopping routing service.");
for (SpanDisruptor disruptor : disruptors) {
}
}
}
package com.a.eye.skywalking.routing.router;
/**
* Created by xin on 2016/11/29.
*/
public class RoutingService {
public static Router router;
public static Router getRouter() {
if (router == null) {
router = new Router();
}
return router;
}
public static void stop() {
if (router != null)
router.stop();
}
}
package com.a.eye.skywalking.routing.storage.listener;
import java.util.List;
/**
* Created by xin on 2016/11/27.
*/
public interface NodeChangesListener {
void notify(List<String> url, NotifyListenerImpl.ChangeType type);
}
package com.a.eye.skywalking.routing.storage.listener;
import com.a.eye.skywalking.registry.RegistryCenterFactory;
import com.a.eye.skywalking.registry.api.NotifyListener;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.routing.config.Config;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import static com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl.ChangeType.Add;
import static com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl.ChangeType.Removed;
public class NotifyListenerImpl implements NotifyListener {
private NodeChangesListener listener;
private List<String> childrenConnectionURLOfPreviousChanged = new ArrayList<>();
private ReentrantLock lock = new ReentrantLock();
public NotifyListenerImpl(String subscribePath, NodeChangesListener listener) {
this.listener = listener;
RegistryCenter center = RegistryCenterFactory.INSTANCE.getRegistryCenter(Config.RegistryCenter.TYPE);
center.start(fetchRegistryCenterConfig());
center.subscribe(subscribePath, this::notify);
}
private Properties fetchRegistryCenterConfig() {
Properties centerConfig = new Properties();
centerConfig.setProperty(ZookeeperConfig.CONNECT_URL, Config.RegistryCenter.CONNECT_URL);
centerConfig.setProperty(ZookeeperConfig.AUTH_SCHEMA, Config.RegistryCenter.AUTH_SCHEMA);
centerConfig.setProperty(ZookeeperConfig.AUTH_INFO, Config.RegistryCenter.AUTH_INFO);
return centerConfig;
}
@Override
public void notify(List<String> currentUrls) {
lock.lock();
try {
List<String> URL = new ArrayList<>(currentUrls);
if (childrenConnectionURLOfPreviousChanged.size() > URL.size()) {
childrenConnectionURLOfPreviousChanged.removeAll(URL);
listener.notify(childrenConnectionURLOfPreviousChanged, Removed);
} else {
URL.removeAll(childrenConnectionURLOfPreviousChanged);
listener.notify(URL, Add);
}
childrenConnectionURLOfPreviousChanged = new ArrayList<>(URL);
} finally {
lock.unlock();
}
}
public enum ChangeType {
Removed, Add;
}
}
# logger #
log4j.rootLogger=Rolling_File,CONSOLE
log4j.logger.org.apache=OFF
log4j.logger.io.netty=OFF
log4j.org.elasticsearch=OFF
log4j.logger.com.a.eye.skywalking.network.dependencies.io.netty=OFF
log4j.appender.Rolling_File=org.apache.log4j.RollingFileAppender
log4j.appender.Rolling_File.Threshold=WARN
log4j.appender.Rolling_File.File=../logs/storage-server-log4j.log
log4j.appender.Rolling_File.Append=true
log4j.appender.Rolling_File.MaxFileSize=100MB
log4j.appender.Rolling_File.MaxBackupIndex=5
log4j.appender.Rolling_File.layout=org.apache.log4j.PatternLayout
log4j.appender.Rolling_File.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
# Console Appender #
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Target=System.out
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" name="storage-server" packages="">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d %-5p %c{1}:%L - %m%n"/>
</Console>
<RollingFile name="RollingFile" fileName="../logs/storage-server.log"
filePattern="logs/storage-server-%d{MM-dd-yyyy}.log.gz"
ignoreExceptions="false">
<PatternLayout>
<Pattern>%d %-5p %c{1}:%L - %m%n</Pattern>
</PatternLayout>
<SizeBasedTriggeringPolicy size="200MB"/>
<DefaultRolloverStrategy max="5"/>
</RollingFile>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="RollingFile" level="INFO"/>
</Root>
</Loggers>
</Configuration>
package com.a.eye.skywalking.routing.router;
import com.a.eye.skywalking.registry.RegistryCenterFactory;
import com.a.eye.skywalking.registry.api.CenterType;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl;
import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class NodeChangesListenerTest {
@Spy
private Router router = new Router();
private TestingServer zkTestServer;
private RegistryCenter registryCenter;
private NotifyListenerImpl notifyListenerImpl;
@Before
public void setUp() throws Exception {
zkTestServer = new TestingServer(2181, true);
registryCenter = RegistryCenterFactory.INSTANCE.getRegistryCenter(CenterType.DEFAULT_CENTER_TYPE);
Properties config = new Properties();
config.put(ZookeeperConfig.CONNECT_URL, "127.0.0.1:2181");
registryCenter.start(config);
}
@Test
public void testRoutingStartBeforeStorageNode() throws InterruptedException {
notifyListenerImpl = new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, router);
registryCenter.register(Config.StorageNode.SUBSCRIBE_PATH + "/127.0.0.1:34000");
Thread.sleep(10);
List<String> nodeURL = new ArrayList<>();
nodeURL.add("127.0.0.1:34000");
verify(router, times(1)).notify(eq(nodeURL), eq(NotifyListenerImpl.ChangeType.Add));
}
@Test
public void testStorageNodeStartBeforeRoutingStart() throws InterruptedException {
registryCenter.register(Config.StorageNode.SUBSCRIBE_PATH + "/127.0.0.1:34000");
notifyListenerImpl = new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, router);
Thread.sleep(10);
List<String> nodeURL = new ArrayList<>();
nodeURL.add("127.0.0.1:34000");
verify(router, times(1)).notify(eq(nodeURL), eq(NotifyListenerImpl.ChangeType.Add));
}
@After
public void tearUp() throws IOException {
zkTestServer.stop();
}
}
\ No newline at end of file
# logger #
log4j.rootLogger=CONSOLE
log4j.logger.org.apache=OFF
log4j.logger.io.netty=OFF
log4j.org.elasticsearch=OFF
log4j.logger.com.a.eye.skywalking.network.dependencies.io.netty=OFF
# Console Appender #
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Target=System.out
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" name="storage-server" packages="">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d %-5p %c{1}:%L - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="Console" level="DEBUG"/>
</Root>
</Loggers>
</Configuration>
......@@ -40,7 +40,7 @@ public class Main {
public static void main(String[] args) {
try {
initializeParam();
initConfig();
HealthCollector.init(SERVER_REPORTER_NAME);
new ElasticBootstrap().boot(NetUtils.getIndexServerPort());
......@@ -65,6 +65,7 @@ public class Main {
logger.error("SkyWalking storage server start failure.", e);
} finally {
server.stop();
System.exit(-1);
}
}
......@@ -79,7 +80,7 @@ public class Main {
registryCenter.register(PATH_PREFIX + NetUtils.getLocalAddress().getHostAddress() + ":" + Config.Server.PORT);
}
private static void initializeParam() throws IllegalAccessException, IOException {
private static void initConfig() throws IllegalAccessException, IOException {
Properties properties = new Properties();
try {
properties.load(Main.class.getResourceAsStream("/config.properties"));
......
......@@ -30,7 +30,7 @@ public class DataFileNameDesc {
public DataFileNameDesc(String fileName) {
int lastIndex = fileName.lastIndexOf('_');
try {
this.name = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SSS").parse(fileName.substring(0, lastIndex - 1))
this.name = new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SSS").parse(fileName.substring(0, lastIndex))
.getTime();
} catch (ParseException e) {
}
......@@ -72,4 +72,11 @@ public class DataFileNameDesc {
public int getSuffix() {
return suffix;
}
@Override
public String toString() {
return "DataFileNameDesc{" +
"fileNameStr='" + fileNameStr + '\'' +
'}';
}
}
......@@ -58,4 +58,14 @@ public class IndexMetaInfo {
public SpanType getSpanType() {
return spanType;
}
@Override
public String toString() {
return "IndexMetaInfo{" +
"spanType=" + spanType +
", nameDesc=" + nameDesc +
", offset=" + offset +
", length=" + length +
'}';
}
}
# logger #
log4j.rootLogger=Rolling_File
log4j.rootLogger=Rolling_File,CONSOLE
log4j.logger.org.apache=OFF
log4j.logger.io.netty=OFF
log4j.org.elasticsearch=OFF
log4j.org.I0Itec=OFF
log4j.logger.com.a.eye.skywalking.network.dependencies.io.grpc=OFF
log4j.logger.com.a.eye.skywalking.network.dependencies.io.netty=OFF
log4j.appender.Rolling_File=org.apache.log4j.RollingFileAppender
......@@ -14,3 +16,8 @@ log4j.appender.Rolling_File.MaxBackupIndex=5
log4j.appender.Rolling_File.layout=org.apache.log4j.PatternLayout
log4j.appender.Rolling_File.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
# Console Appender #
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Target=System.out
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d %-5p %c{1}:%L - %m%n
......@@ -17,6 +17,7 @@
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="RollingFile" level="INFO"/>
<AppenderRef ref="Console" level="DEBUG"/>
</Root>
</Loggers>
</Configuration>
......@@ -2,8 +2,8 @@ import java.util.concurrent.CountDownLatch;
public class StorageClient {
private static int THREAD_COUNT = 4;
private static final long COUNT = 1_000_000_000;
private static int THREAD_COUNT = 1;
private static final long COUNT = 1;
public static void main(String[] args) throws InterruptedException {
......
......@@ -23,7 +23,7 @@ public class StorageThread extends Thread {
StorageThread(long count, CountDownLatch countDownLatch, int index) {
listener = new MyStorageClientListener();
client = new Client("127.0.0.1", 34000).newSpanStorageClient(listener);
client = new Client("127.0.0.1", 23000).newSpanStorageClient(listener);
this.count = count;
this.countDownLatch = countDownLatch;
this.index = index;
......@@ -43,15 +43,18 @@ public class StorageThread extends Thread {
.setUserId("1").setViewPointId("http://localhost:8080/wwww/test/helloWorld").build();
AckSpan ackSpan = AckSpan.newBuilder().setLevelId(0).setCost(10).setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277).addSegments(Thread.currentThread().getId()).addSegments(3)
TraceId.newBuilder().addSegments(201611).addSegments(value).addSegments(8504828).addSegments(2277).addSegments(53).addSegments(3)
.build()).setStatusCode(0).setViewpointId("http://localhost:8080/wwww/test/helloWorld").build();
requestSpanList.add(requestSpan);
ackSpanList.add(ackSpan);
cycle++;
if (cycle == 100) {
if (cycle == 1) {
client.sendACKSpan(ackSpanList);
client.sendRequestSpan(requestSpanList);
cycle = 0;
while(!listener.isCompleted){
while (!listener.isCompleted) {
LockSupport.parkNanos(1);
}
listener.begin();
......@@ -59,10 +62,6 @@ public class StorageThread extends Thread {
requestSpanList.clear();
}
requestSpanList.add(requestSpan);
ackSpanList.add(ackSpan);
cycle++;
if (i % 10_000 == 0) {
System.out.println("index-" + index + " num=" + i + " " + value);
}
......
......@@ -36,7 +36,7 @@ public class SearchClient {
StreamObserver<QueryTask> searchResult = searchServiceStub.search(serverStreamObserver);
searchResult.onNext(QueryTask.newBuilder().setTraceId(
TraceId.newBuilder().addSegments(201611).addSegments(1479803629139L).addSegments(8504828)
TraceId.newBuilder().addSegments(201611).addSegments(1480475308493L).addSegments(8504828)
.addSegments(2277).addSegments(53).addSegments(3).build()).build());
searchResult.onCompleted();
......
package com.a.eye.skywalking.storage;
import com.a.eye.skywalking.storage.data.file.DataFileNameDesc;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
/**
* Created by xin on 2016/11/30.
*/
public class TestFile {
public static void main(String[] args) throws IOException {
// File file = new File("/Users/xin/workbench/data/file", "2016_11_29_23_02_55_517_1000");
// System.out.println(file.length());
//
// FileInputStream byteInputStream = new FileInputStream(file);
// byte[] bytes = new byte[1024];
// int count = 0;
// int length = 0;
// while ((count = byteInputStream.read(bytes)) != -1) {
// length += count;
// }
//
// System.out.println(length);
long startTime = System.currentTimeMillis();
System.out.println(startTime);
System.out.println(new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss_SSS").format(startTime));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册