提交 5e594483 编写于 作者: wu-sheng's avatar wu-sheng

remove NotifyListenerImpl, it’s not easy to understan its purpose.

上级 7aefb8f8
......@@ -10,6 +10,7 @@
<name>skywalking-agent</name>
<url>http://maven.apache.org</url>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
......
JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1JappR1
\ No newline at end of file
......@@ -2,15 +2,16 @@ package com.a.eye.skywalking.routing;
import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.logging.api.LogResolver;
import com.a.eye.skywalking.logging.impl.log4j2.Log4j2Resolver;
import com.a.eye.skywalking.network.Server;
import com.a.eye.skywalking.registry.RegistryCenterFactory;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.config.ConfigInitializer;
import com.a.eye.skywalking.routing.listener.SpanStorageListenerImpl;
import com.a.eye.skywalking.routing.listener.TraceSearchListenerImpl;
import com.a.eye.skywalking.routing.router.RoutingService;
import com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl;
import java.io.IOException;
import java.util.Map;
......@@ -25,7 +26,10 @@ public class Main {
initConfig();
LogManager.setLogResolver(new Log4j2Resolver());
new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, RoutingService.getRouter());
RegistryCenter center = RegistryCenterFactory.INSTANCE.getRegistryCenter(Config.RegistryCenter.TYPE);
center.start(fetchRegistryCenterConfig());
center.subscribe(Config.StorageNode.SUBSCRIBE_PATH, RoutingService.getRouter());
Server.newBuilder(Config.Routing.PORT).addSpanStorageService(new SpanStorageListenerImpl()).addTraceSearchService(new TraceSearchListenerImpl()).build().start();
logger.info("Skywalking routing service was started.");
Thread.currentThread().join();
......@@ -56,4 +60,12 @@ public class Main {
logger.info("{} = {}", entry.getKey(), entry.getValue());
}
}
private static Properties fetchRegistryCenterConfig() {
Properties centerConfig = new Properties();
centerConfig.setProperty(ZookeeperConfig.CONNECT_URL, Config.RegistryCenter.CONNECT_URL);
centerConfig.setProperty(ZookeeperConfig.AUTH_SCHEMA, Config.RegistryCenter.AUTH_SCHEMA);
centerConfig.setProperty(ZookeeperConfig.AUTH_INFO, Config.RegistryCenter.AUTH_INFO);
return centerConfig;
}
}
......@@ -14,9 +14,14 @@ public class Config {
public static class RegistryCenter {
public static String TYPE = "zookeeper";
public static String CONNECT_URL = "127.0.0.1:2181";
public static String AUTH_SCHEMA = "";
public static String AUTH_INFO = "";
public static String PATH_PREFIX = "/skywalking/routing_list/";
}
......
......@@ -4,20 +4,22 @@ import com.a.eye.skywalking.logging.api.ILog;
import com.a.eye.skywalking.logging.api.LogManager;
import com.a.eye.skywalking.network.grpc.AckSpan;
import com.a.eye.skywalking.network.grpc.RequestSpan;
import com.a.eye.skywalking.registry.api.NotifyListener;
import com.a.eye.skywalking.registry.api.RegistryNode;
import com.a.eye.skywalking.routing.client.StorageClientCachePool;
import com.a.eye.skywalking.routing.disruptor.NoopSpanDisruptor;
import com.a.eye.skywalking.routing.disruptor.SpanDisruptor;
import com.a.eye.skywalking.routing.storage.listener.NodeChangesListener;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
public class Router implements NodeChangesListener {
public class Router implements NotifyListener {
private static ILog logger = LogManager.getLogger(Router.class);
private ReentrantLock lock = new ReentrantLock();
private SpanDisruptor[] disruptors = new SpanDisruptor[0];
public SpanDisruptor lookup(RequestSpan requestSpan) {
......@@ -44,37 +46,42 @@ public class Router implements NodeChangesListener {
@Override
public void notify(List<RegistryNode> registryNodes) {
List<SpanDisruptor> newDisruptors = new ArrayList<SpanDisruptor>(Arrays.asList(disruptors));
List<SpanDisruptor> removedDisruptors = new ArrayList<SpanDisruptor>();
for (RegistryNode node : registryNodes) {
if (node.getChangeType() == RegistryNode.ChangeType.ADDED) {
newDisruptors.add(new SpanDisruptor(node.getNode()));
} else {
removedDisruptors.add(getAndRemoveSpanDistruptor(newDisruptors, node.getNode()));
try {
lock.lock();
List<SpanDisruptor> newDisruptors = new ArrayList<SpanDisruptor>(Arrays.asList(disruptors));
List<SpanDisruptor> removedDisruptors = new ArrayList<SpanDisruptor>();
for (RegistryNode node : registryNodes) {
if (node.getChangeType() == RegistryNode.ChangeType.ADDED) {
newDisruptors.add(new SpanDisruptor(node.getNode()));
} else {
removedDisruptors.add(getAndRemoveSpanDistruptor(newDisruptors, node.getNode()));
}
}
}
Collections.sort(newDisruptors, (o1, o2) -> {
long o1Key = Long.parseLong(o1.getConnectionURL().replace(".", "").replace(":", ""));
long o2Key = Long.parseLong(o2.getConnectionURL().replace(".", "").replace(":", ""));
if (o1Key == o2Key) {
return 0;
} else if (o1Key > o2Key) {
return 1;
} else {
return -1;
Collections.sort(newDisruptors, (o1, o2) -> {
long o1Key = Long.parseLong(o1.getConnectionURL().replace(".", "").replace(":", ""));
long o2Key = Long.parseLong(o2.getConnectionURL().replace(".", "").replace(":", ""));
if (o1Key == o2Key) {
return 0;
} else if (o1Key > o2Key) {
return 1;
} else {
return -1;
}
});
//先停止往里面存放数据
disruptors = newDisruptors.toArray(new SpanDisruptor[newDisruptors.size()]);
// 而后stop
for (SpanDisruptor removedDisruptor : removedDisruptors) {
removedDisruptor.shutdown();
StorageClientCachePool.INSTANCE.shutdown(removedDisruptor.getConnectionURL());
}
});
//先停止往里面存放数据
disruptors = newDisruptors.toArray(new SpanDisruptor[newDisruptors.size()]);
// 而后stop
for (SpanDisruptor removedDisruptor : removedDisruptors) {
removedDisruptor.shutdown();
StorageClientCachePool.INSTANCE.shutdown(removedDisruptor.getConnectionURL());
} finally {
lock.unlock();
}
}
......
package com.a.eye.skywalking.routing.storage.listener;
import com.a.eye.skywalking.registry.api.RegistryNode;
import java.util.List;
/**
* Created by xin on 2016/11/27.
*/
public interface NodeChangesListener {
void notify(List<RegistryNode> registryNodes);
}
package com.a.eye.skywalking.routing.storage.listener;
import com.a.eye.skywalking.registry.RegistryCenterFactory;
import com.a.eye.skywalking.registry.api.NotifyListener;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.api.RegistryNode;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.routing.config.Config;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import static com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl.ChangeType.Add;
import static com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl.ChangeType.Removed;
public class NotifyListenerImpl implements NotifyListener {
private NodeChangesListener listener;
private ReentrantLock lock = new ReentrantLock();
public NotifyListenerImpl(String subscribePath, NodeChangesListener listener) {
this.listener = listener;
RegistryCenter center = RegistryCenterFactory.INSTANCE.getRegistryCenter(Config.RegistryCenter.TYPE);
center.start(fetchRegistryCenterConfig());
center.subscribe(subscribePath, this::notify);
}
private Properties fetchRegistryCenterConfig() {
Properties centerConfig = new Properties();
centerConfig.setProperty(ZookeeperConfig.CONNECT_URL, Config.RegistryCenter.CONNECT_URL);
centerConfig.setProperty(ZookeeperConfig.AUTH_SCHEMA, Config.RegistryCenter.AUTH_SCHEMA);
centerConfig.setProperty(ZookeeperConfig.AUTH_INFO, Config.RegistryCenter.AUTH_INFO);
return centerConfig;
}
@Override
public void notify(List<RegistryNode> registryNodes) {
try{
lock.lock();
listener.notify(registryNodes);
}finally {
lock.unlock();
}
}
public enum ChangeType {
Removed, Add;
}
}
......@@ -5,7 +5,6 @@ import com.a.eye.skywalking.registry.api.CenterType;
import com.a.eye.skywalking.registry.api.RegistryCenter;
import com.a.eye.skywalking.registry.impl.zookeeper.ZookeeperConfig;
import com.a.eye.skywalking.routing.config.Config;
import com.a.eye.skywalking.routing.storage.listener.NotifyListenerImpl;
import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
......@@ -19,10 +18,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class NodeChangesListenerTest {
......@@ -31,7 +26,6 @@ public class NodeChangesListenerTest {
private TestingServer zkTestServer;
private RegistryCenter registryCenter;
private NotifyListenerImpl notifyListenerImpl;
@Before
public void setUp() throws Exception {
......@@ -45,7 +39,6 @@ public class NodeChangesListenerTest {
@Test
public void testRoutingStartBeforeStorageNode() throws InterruptedException {
notifyListenerImpl = new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, router);
registryCenter.register(Config.StorageNode.SUBSCRIBE_PATH + "/127.0.0.1:34000");
Thread.sleep(10);
List<String> nodeURL = new ArrayList<>();
......@@ -57,7 +50,6 @@ public class NodeChangesListenerTest {
@Test
public void testStorageNodeStartBeforeRoutingStart() throws InterruptedException {
registryCenter.register(Config.StorageNode.SUBSCRIBE_PATH + "/127.0.0.1:34000");
notifyListenerImpl = new NotifyListenerImpl(Config.StorageNode.SUBSCRIBE_PATH, router);
Thread.sleep(10);
List<String> nodeURL = new ArrayList<>();
nodeURL.add("127.0.0.1:34000");
......@@ -69,4 +61,4 @@ public class NodeChangesListenerTest {
zkTestServer.stop();
}
}
\ No newline at end of file
}
package com.a.eye.skywalking.web.config;
/**
* Created by xin on 2016/11/2.
*/
public class Config {
public static class RegistryCenter {
public static String AUTH_INFO = "";
public static String AUTH_SCHEMA = "";
public static String CONNECT_URL = "127.0.0.1:2181";
public static String PATH_PREFIX = "/skywalking/storage_list/";
}
public static class RoutingNode {
public static String SUBSCRIBE_PATH = "/skywalking/routing_list";
}
}
package com.a.eye.skywalking.web.config;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.LinkedList;
import java.util.Properties;
import java.util.logging.Logger;
public class ConfigInitializer {
private static Logger logger = Logger.getLogger(ConfigInitializer.class.getName());
public static void initialize(Properties properties, Class<?> rootConfigType) throws IllegalAccessException {
initNextLevel(properties, rootConfigType, new ConfigDesc());
}
private static void initNextLevel(Properties properties, Class<?> recentConfigType, ConfigDesc parentDesc) throws NumberFormatException, IllegalArgumentException, IllegalAccessException {
for (Field field : recentConfigType.getFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers())) {
String configKey = (parentDesc + "." +
field.getName()).toLowerCase();
String value = properties.getProperty(configKey);
if (value != null) {
if (field.getType().equals(int.class))
field.set(null, Integer.valueOf(value));
if (field.getType().equals(String.class))
field.set(null, value);
if (field.getType().equals(long.class))
field.set(null, Long.valueOf(value));
if (field.getType().equals(boolean.class))
field.set(null, Boolean.valueOf(value));
}
}
}
for (Class<?> innerConfiguration : recentConfigType.getClasses()) {
parentDesc.append(innerConfiguration.getSimpleName());
initNextLevel(properties, innerConfiguration, parentDesc);
parentDesc.removeLastDesc();
}
}
}
class ConfigDesc {
private LinkedList<String> descs = new LinkedList<String>();
void append(String currentDesc) {
descs.addLast(currentDesc);
}
void removeLastDesc() {
descs.removeLast();
}
@Override
public String toString() {
if (descs.size() == 0) {
return "";
}
StringBuilder ret = new StringBuilder(descs.getFirst());
boolean first = true;
for (String desc : descs) {
if (first) {
first = false;
continue;
}
ret.append(".").append(desc);
}
return ret.toString();
}
}
package com.a.eye.skywalking.web.controller;
import com.a.eye.skywalking.web.common.BaseController;
import com.a.eye.skywalking.web.config.Config;
import com.a.eye.skywalking.web.config.ConfigInitializer;
import com.a.eye.skywalking.web.dto.TraceTreeInfo;
import com.a.eye.skywalking.web.service.inter.ITraceTreeService;
import com.a.eye.skywalking.web.util.StringUtil;
......@@ -16,7 +18,10 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.Properties;
/**
* Created by xin on 16-3-29.
......@@ -34,6 +39,13 @@ public class SearchController extends BaseController {
return "index";
}
@PostConstruct
public void init() throws IOException, IllegalAccessException {
Properties properties = new Properties();
properties.load(SearchController.class.getResourceAsStream("/config.properties"));
ConfigInitializer.initialize(properties, Config.class);
}
@RequestMapping(value = "/search/traceId", produces = "application/json; charset=UTF-8")
@ResponseBody
public String loadTraceTree(@RequestParam("traceId") String traceId) {
......
# the registry center connect url (Default: zookeeper)
registrycenter.connect_url=127.0.0.1:2181
......@@ -38,8 +38,8 @@
</tx:advice>
<aop:config>
<aop:pointcut id="servicePointcut" expression="execution(* com.ai.cloud.skywalking.web.service.inter.*.*(..))"/>
<aop:pointcut id="servicePointcut" expression="execution(* com.a.eye.skywalking.web.service.inter.*.*(..))"/>
<aop:advisor pointcut-ref="servicePointcut" advice-ref="txAdvice"/>
</aop:config>
</beans>
\ No newline at end of file
</beans>
......@@ -47,6 +47,7 @@
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
<value>classpath:config.properties</value>
</list>
</property>
</bean>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册