提交 62ca37bc 编写于 作者: wu-sheng's avatar wu-sheng

Refactor codebases for v6, restore modulization back

上级 42072d85
...@@ -18,9 +18,15 @@ ...@@ -18,9 +18,15 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone; package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.slf4j.*; import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -29,11 +35,8 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider { ...@@ -29,11 +35,8 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(ClusterModuleStandaloneProvider.class); private static final Logger logger = LoggerFactory.getLogger(ClusterModuleStandaloneProvider.class);
private final StandaloneServiceManager serviceManager;
public ClusterModuleStandaloneProvider() { public ClusterModuleStandaloneProvider() {
super(); super();
this.serviceManager = new StandaloneServiceManager();
} }
@Override public String name() { @Override public String name() {
...@@ -49,8 +52,9 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider { ...@@ -49,8 +52,9 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
} }
@Override public void prepare() throws ServiceNotProvidedException { @Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(ModuleRegister.class, new StandaloneModuleRegister(serviceManager)); StandaloneManager standaloneManager = new StandaloneManager();
this.registerServiceImplementation(ModuleQuery.class, new StandaloneModuleQuery(serviceManager)); this.registerServiceImplementation(ClusterRegister.class, standaloneManager);
this.registerServiceImplementation(ClusterNodesQuery.class, standaloneManager);
} }
@Override public void start() throws ModuleStartException { @Override public void start() throws ModuleStartException {
......
...@@ -18,25 +18,35 @@ ...@@ -18,25 +18,35 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone; package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import java.util.*; import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.InstanceDetails; import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import java.util.ArrayList;
import java.util.List;
/** /**
* @author peng-yongsheng * A cluster manager simulator. Work in memory only.
* Also return the current instance.
*
* @author peng-yongsheng, Wu Sheng
*/ */
public class StandaloneServiceManager { public class StandaloneManager implements ClusterNodesQuery, ClusterRegister {
private final Map<String, InstanceDetails> instanceDetailsMap; private volatile RemoteInstance remoteInstance;
public StandaloneServiceManager() {
this.instanceDetailsMap = new HashMap<>();
}
public void put(String moduleName, String providerName, InstanceDetails instanceDetails) { @Override public void registerRemote(RemoteInstance remoteInstance) {
instanceDetailsMap.put(moduleName + "/" + providerName, instanceDetails); this.remoteInstance = remoteInstance;
} }
public InstanceDetails get(String moduleName, String providerName) { @Override
return instanceDetailsMap.get(moduleName + "/" + providerName); public List<RemoteInstance> queryRemoteNodes() {
if(remoteInstance == null){
return new ArrayList(0);
}
ArrayList remoteList = new ArrayList(1);
remoteList.add(remoteInstance);
return remoteList;
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.*;
/**
* @author peng-yongsheng
*/
public class StandaloneModuleRegister implements ModuleRegister {
private final StandaloneServiceManager serviceManager;
StandaloneModuleRegister(StandaloneServiceManager serviceManager) {
this.serviceManager = serviceManager;
}
@Override public void register(String moduleName, String providerName,
InstanceDetails instanceDetails) {
serviceManager.put(moduleName, providerName, instanceDetails);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import org.apache.skywalking.oap.server.core.cluster.*;
/**
* @author peng-yongsheng
*/
public class StandaloneRegister implements ModuleRegister {
@Override public void register(String moduleName, String providerName,
InstanceDetails instanceDetails) throws ServiceRegisterException {
}
}
...@@ -18,24 +18,20 @@ ...@@ -18,24 +18,20 @@
package org.apache.skywalking.oap.server.cluster.plugin.standalone; package org.apache.skywalking.oap.server.cluster.plugin.standalone;
import java.util.*; import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.*; import org.junit.Assert;
import org.junit.Test;
/** public class StandaloneManagerTest {
* @author peng-yongsheng @Test
*/ public void test(){
public class StandaloneModuleQuery implements ModuleQuery { StandaloneManager standaloneManager = new StandaloneManager();
RemoteInstance remote1 = new RemoteInstance();
private final StandaloneServiceManager serviceManager; RemoteInstance remote2 = new RemoteInstance();
StandaloneModuleQuery(StandaloneServiceManager serviceManager) {
this.serviceManager = serviceManager;
}
@Override standaloneManager.registerRemote(remote1);
public List<InstanceDetails> query(String moduleName, String providerName) { Assert.assertEquals(remote1, standaloneManager.queryRemoteNodes().get(0));
List<InstanceDetails> instanceDetails = new ArrayList<>(1); standaloneManager.registerRemote(remote2);
instanceDetails.add(serviceManager.get(moduleName, providerName)); Assert.assertEquals(remote2, standaloneManager.queryRemoteNodes().get(0));
return instanceDetails;
} }
} }
...@@ -19,15 +19,27 @@ ...@@ -19,15 +19,27 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper; package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import org.apache.curator.RetryPolicy; import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.*; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.*; import org.apache.curator.x.discovery.ServiceCache;
import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.skywalking.oap.server.library.module.*; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.slf4j.*; import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * Use Zookeeper to manage all instances in SkyWalking cluster.
*
* @author peng-yongsheng, Wu Sheng
*/ */
public class ClusterModuleZookeeperProvider extends ModuleProvider { public class ClusterModuleZookeeperProvider extends ModuleProvider {
...@@ -35,15 +47,13 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider { ...@@ -35,15 +47,13 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
private static final String BASE_PATH = "/skywalking"; private static final String BASE_PATH = "/skywalking";
private final ServiceCacheManager cacheManager;
private final ClusterModuleZookeeperConfig config; private final ClusterModuleZookeeperConfig config;
private CuratorFramework client; private CuratorFramework client;
private ServiceDiscovery<InstanceDetails> serviceDiscovery; private ServiceDiscovery<RemoteInstance> serviceDiscovery;
public ClusterModuleZookeeperProvider() { public ClusterModuleZookeeperProvider() {
super(); super();
this.config = new ClusterModuleZookeeperConfig(); this.config = new ClusterModuleZookeeperConfig();
this.cacheManager = new ServiceCacheManager();
} }
@Override public String name() { @Override public String name() {
...@@ -58,27 +68,35 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider { ...@@ -58,27 +68,35 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
return config; return config;
} }
@Override public void prepare() throws ServiceNotProvidedException { @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTimeMs(), config.getMaxRetries()); RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTimeMs(), config.getMaxRetries());
client = CuratorFrameworkFactory.newClient(config.getHostPort(), retryPolicy); client = CuratorFrameworkFactory.newClient(config.getHostPort(), retryPolicy);
serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client) serviceDiscovery = ServiceDiscoveryBuilder.builder(RemoteInstance.class).client(client)
.basePath(BASE_PATH) .basePath(BASE_PATH)
.watchInstances(true) .watchInstances(true)
.serializer(new SWInstanceSerializer()).build(); .serializer(new SWInstanceSerializer()).build();
this.registerServiceImplementation(ModuleRegister.class, new ZookeeperModuleRegister(serviceDiscovery, cacheManager)); String remoteName = "remote";
this.registerServiceImplementation(ModuleQuery.class, new ZookeeperModuleQuery(cacheManager)); ServiceCache<RemoteInstance> serviceCache = serviceDiscovery.serviceCacheBuilder()
} .name(remoteName)
.build();
@Override public void start() throws ModuleStartException {
try { try {
client.start(); client.start();
client.blockUntilConnected(); client.blockUntilConnected();
serviceDiscovery.start(); serviceDiscovery.start();
serviceCache.start();
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
this.registerServiceImplementation(ClusterRegister.class, new ZookeeperNodeRegister(serviceDiscovery, remoteName));
this.registerServiceImplementation(ClusterNodesQuery.class, new ZookeeperModuleQuery(serviceCache));
}
@Override public void start() {
} }
@Override public void notifyAfterCompleted() { @Override public void notifyAfterCompleted() {
......
...@@ -22,21 +22,21 @@ import com.google.gson.Gson; ...@@ -22,21 +22,21 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer; import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.skywalking.oap.server.core.cluster.InstanceDetails; import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class SWInstanceSerializer implements InstanceSerializer<InstanceDetails> { public class SWInstanceSerializer implements InstanceSerializer<RemoteInstance> {
private final Gson gson = new Gson(); private final Gson gson = new Gson();
@Override public byte[] serialize(ServiceInstance<InstanceDetails> instance) throws Exception { @Override public byte[] serialize(ServiceInstance<RemoteInstance> instance) throws Exception {
return gson.toJson(instance).getBytes(); return gson.toJson(instance).getBytes();
} }
@Override public ServiceInstance<InstanceDetails> deserialize(byte[] bytes) throws Exception { @Override public ServiceInstance<RemoteInstance> deserialize(byte[] bytes) throws Exception {
return gson.fromJson(new String(bytes), new TypeToken<ServiceInstance<InstanceDetails>>() { return gson.fromJson(new String(bytes), new TypeToken<ServiceInstance<RemoteInstance>>() {
}.getType()); }.getType());
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.skywalking.oap.server.core.cluster.InstanceDetails;
/**
* @author peng-yongsheng
*/
public class ServiceCacheManager {
private final Map<String, ServiceCache<InstanceDetails>> serviceCacheMap;
public ServiceCacheManager() {
this.serviceCacheMap = new ConcurrentHashMap<>();
}
public void put(String name, ServiceCache<InstanceDetails> cache) {
serviceCacheMap.put(name, cache);
}
public ServiceCache<InstanceDetails> get(String name) {
return serviceCacheMap.get(name);
}
}
...@@ -19,26 +19,27 @@ ...@@ -19,26 +19,27 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper; package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.*; import java.util.*;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.skywalking.oap.server.core.cluster.*;
/** /**
* @author peng-yongsheng * @author peng-yongsheng, Wu Sheng
*/ */
public class ZookeeperModuleQuery implements ModuleQuery { public class ZookeeperModuleQuery implements ClusterNodesQuery {
private final ServiceCacheManager cacheManager; private final ServiceCache<RemoteInstance> serviceCache;
ZookeeperModuleQuery(ServiceCacheManager cacheManager) { ZookeeperModuleQuery(ServiceCache<RemoteInstance> serviceCache) {
this.cacheManager = cacheManager; this.serviceCache = serviceCache;
} }
@Override @Override
public List<InstanceDetails> query(String moduleName, String providerName) throws ServiceRegisterException { public List<RemoteInstance> queryRemoteNodes() throws ServiceRegisterException {
List<ServiceInstance<InstanceDetails>> serviceInstances = cacheManager.get(NodeNameBuilder.build(moduleName, providerName)).getInstances(); List<ServiceInstance<RemoteInstance>> serviceInstances = serviceCache.getInstances();
List<InstanceDetails> instanceDetails = new ArrayList<>(serviceInstances.size()); List<RemoteInstance> remoteInstanceDetails = new ArrayList<>(serviceInstances.size());
serviceInstances.forEach(serviceInstance -> instanceDetails.add(serviceInstance.getPayload())); serviceInstances.forEach(serviceInstance -> remoteInstanceDetails.add(serviceInstance.getPayload()));
return instanceDetails; return remoteInstanceDetails;
} }
} }
...@@ -19,47 +19,41 @@ ...@@ -19,47 +19,41 @@
package org.apache.skywalking.oap.server.cluster.plugin.zookeeper; package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.util.UUID; import java.util.UUID;
import org.apache.curator.x.discovery.*; import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ZookeeperModuleRegister implements ModuleRegister { public class ZookeeperNodeRegister implements ClusterRegister {
private static final Logger logger = LoggerFactory.getLogger(ZookeeperNodeRegister.class);
private final ServiceDiscovery<InstanceDetails> serviceDiscovery; private final ServiceDiscovery<RemoteInstance> serviceDiscovery;
private final ServiceCacheManager cacheManager; private final String nodeName;
ZookeeperModuleRegister(ServiceDiscovery<InstanceDetails> serviceDiscovery, ZookeeperNodeRegister(ServiceDiscovery<RemoteInstance> serviceDiscovery, String nodeName) {
ServiceCacheManager cacheManager) {
this.serviceDiscovery = serviceDiscovery; this.serviceDiscovery = serviceDiscovery;
this.cacheManager = cacheManager; this.nodeName = nodeName;
} }
@Override public synchronized void register(String moduleName, String providerName, @Override public synchronized void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException {
InstanceDetails instanceDetails) throws ServiceRegisterException {
try { try {
String name = NodeNameBuilder.build(moduleName, providerName); ServiceInstance<RemoteInstance> thisInstance = ServiceInstance.<RemoteInstance>builder()
.name(nodeName)
ServiceInstance<InstanceDetails> thisInstance = ServiceInstance.<InstanceDetails>builder()
.name(NodeNameBuilder.build(moduleName, providerName))
.id(UUID.randomUUID().toString()) .id(UUID.randomUUID().toString())
.address(instanceDetails.getHost()) .address(remoteInstance.getHost())
.port(instanceDetails.getPort()) .port(remoteInstance.getPort())
// .uriSpec(new UriSpec(StringUtils.isEmpty(instanceDetails.getContextPath()) ? StringUtils.EMPTY_STRING : instanceDetails.getContextPath())) .payload(remoteInstance)
.payload(instanceDetails)
.build(); .build();
serviceDiscovery.registerService(thisInstance); serviceDiscovery.registerService(thisInstance);
ServiceCache<InstanceDetails> serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(name)
.build();
serviceCache.start();
cacheManager.put(name, serviceCache);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); logger.error(e.getMessage(), e);
throw new ServiceRegisterException(e.getMessage()); throw new ServiceRegisterException(e.getMessage());
} }
} }
......
...@@ -21,9 +21,16 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper; ...@@ -21,9 +21,16 @@ package org.apache.skywalking.oap.server.cluster.plugin.zookeeper;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.curator.test.TestingServer; import org.apache.curator.test.TestingServer;
import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.junit.*; import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -39,7 +46,7 @@ public class ClusterModuleZookeeperProviderTestCase { ...@@ -39,7 +46,7 @@ public class ClusterModuleZookeeperProviderTestCase {
} }
@Test @Test
public void testStart() throws ServiceNotProvidedException, ModuleStartException, ServiceRegisterException { public void testStart() throws ServiceNotProvidedException, ModuleStartException, ServiceRegisterException, InterruptedException {
ClusterModuleZookeeperProvider provider = new ClusterModuleZookeeperProvider(); ClusterModuleZookeeperProvider provider = new ClusterModuleZookeeperProvider();
ClusterModuleZookeeperConfig moduleConfig = (ClusterModuleZookeeperConfig)provider.createConfigBeanIfAbsent(); ClusterModuleZookeeperConfig moduleConfig = (ClusterModuleZookeeperConfig)provider.createConfigBeanIfAbsent();
moduleConfig.setHostPort(server.getConnectString()); moduleConfig.setHostPort(server.getConnectString());
...@@ -49,19 +56,26 @@ public class ClusterModuleZookeeperProviderTestCase { ...@@ -49,19 +56,26 @@ public class ClusterModuleZookeeperProviderTestCase {
provider.prepare(); provider.prepare();
provider.start(); provider.start();
ModuleRegister moduleRegister = provider.getService(ModuleRegister.class); ClusterRegister moduleRegister = provider.getService(ClusterRegister.class);
ModuleQuery moduleQuery = provider.getService(ModuleQuery.class); ClusterNodesQuery clusterNodesQuery = provider.getService(ClusterNodesQuery.class);
InstanceDetails instanceDetails = new InstanceDetails(); RemoteInstance remoteInstance = new RemoteInstance();
instanceDetails.setHost("ProviderAHost"); remoteInstance.setHost("ProviderAHost");
instanceDetails.setPort(1000); remoteInstance.setPort(1000);
moduleRegister.register("ModuleA", "ProviderA", instanceDetails); moduleRegister.registerRemote(remoteInstance);
for (int i = 0; i < 20; i++) {
List<RemoteInstance> detailsList = clusterNodesQuery.queryRemoteNodes();
if(detailsList.size() == 0){
Thread.sleep(500);
continue;
}
Assert.assertEquals(1, detailsList.size());
Assert.assertEquals("ProviderAHost", detailsList.get(0).getHost());
Assert.assertEquals(1000, detailsList.get(0).getPort());
}
List<InstanceDetails> detailsList = moduleQuery.query("ModuleA", "ProviderA");
Assert.assertEquals(1, detailsList.size());
Assert.assertEquals("ProviderAHost", detailsList.get(0).getHost());
Assert.assertEquals(1000, detailsList.get(0).getPort());
} }
@After @After
......
...@@ -18,14 +18,24 @@ ...@@ -18,14 +18,24 @@
package org.apache.skywalking.oap.server.core; package org.apache.skywalking.oap.server.core;
import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.receiver.*; import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.server.*; import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.library.module.*; import org.apache.skywalking.oap.server.core.receiver.SourceReceiver;
import org.apache.skywalking.oap.server.core.receiver.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.slf4j.*; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
...@@ -68,26 +78,22 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -68,26 +78,22 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl()); this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
} }
@Override public void start() throws ModuleStartException { @Override public void start() {
}
@Override public void notifyAfterCompleted() throws ModuleStartException{
try { try {
grpcServer.start(); grpcServer.start();
jettyServer.start(); jettyServer.start();
} catch (ServerException e) { } catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e); throw new ModuleStartException(e.getMessage(), e);
} }
}
@Override public void notifyAfterCompleted() { RemoteInstance gRPCServerInstance = new RemoteInstance();
InstanceDetails gRPCServerInstance = new InstanceDetails();
gRPCServerInstance.setHost(moduleConfig.getGRPCHost()); gRPCServerInstance.setHost(moduleConfig.getGRPCHost());
gRPCServerInstance.setPort(moduleConfig.getGRPCPort()); gRPCServerInstance.setPort(moduleConfig.getGRPCPort());
this.getManager().find(ClusterModule.NAME).getService(ModuleRegister.class).register(CoreModule.NAME, "gRPC", gRPCServerInstance); this.getManager().find(ClusterModule.NAME).getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
InstanceDetails restServerInstance = new InstanceDetails();
restServerInstance.setHost(moduleConfig.getRestHost());
restServerInstance.setPort(moduleConfig.getRestPort());
restServerInstance.setContextPath(moduleConfig.getRestContextPath());
this.getManager().find(ClusterModule.NAME).getService(ModuleRegister.class).register(CoreModule.NAME, "rest", restServerInstance);
} }
@Override @Override
......
...@@ -32,6 +32,6 @@ public class ClusterModule extends ModuleDefine { ...@@ -32,6 +32,6 @@ public class ClusterModule extends ModuleDefine {
} }
@Override public Class[] services() { @Override public Class[] services() {
return new Class[] {ModuleRegister.class, ModuleQuery.class}; return new Class[] {ClusterRegister.class, ClusterNodesQuery.class};
} }
} }
...@@ -25,7 +25,7 @@ import java.util.List; ...@@ -25,7 +25,7 @@ import java.util.List;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface ModuleQuery extends Service { public interface ClusterNodesQuery extends Service {
List<InstanceDetails> query(String moduleName, String providerName) throws ServiceRegisterException; List<RemoteInstance> queryRemoteNodes();
} }
...@@ -23,8 +23,7 @@ import org.apache.skywalking.oap.server.library.module.Service; ...@@ -23,8 +23,7 @@ import org.apache.skywalking.oap.server.library.module.Service;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface ModuleRegister extends Service { public interface ClusterRegister extends Service {
void register(String moduleName, String providerName, void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException;
InstanceDetails instanceDetails) throws ServiceRegisterException;
} }
...@@ -21,11 +21,10 @@ package org.apache.skywalking.oap.server.core.cluster; ...@@ -21,11 +21,10 @@ package org.apache.skywalking.oap.server.core.cluster;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class InstanceDetails { public class RemoteInstance {
private String host; private String host;
private int port; private int port;
private String contextPath;
public String getHost() { public String getHost() {
return host; return host;
...@@ -42,12 +41,4 @@ public class InstanceDetails { ...@@ -42,12 +41,4 @@ public class InstanceDetails {
public void setPort(int port) { public void setPort(int port) {
this.port = port; this.port = port;
} }
public String getContextPath() {
return contextPath;
}
public void setContextPath(String contextPath) {
this.contextPath = contextPath;
}
} }
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.core.storage.StorageModule
org.apache.skywalking.oap.server.core.cluster.ClusterModule
org.apache.skywalking.oap.server.core.CoreModule
\ No newline at end of file
...@@ -63,7 +63,7 @@ class BootstrapFlow { ...@@ -63,7 +63,7 @@ class BootstrapFlow {
} }
} }
void notifyAfterCompleted() throws ServiceNotProvidedException { void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) { for (ModuleProvider provider : startupSequence) {
provider.notifyAfterCompleted(); provider.notifyAfterCompleted();
} }
......
...@@ -53,7 +53,7 @@ public abstract class ModuleDefine { ...@@ -53,7 +53,7 @@ public abstract class ModuleDefine {
* @throws ProviderNotFoundException when even don't find a single one providers. * @throws ProviderNotFoundException when even don't find a single one providers.
*/ */
void prepare(ModuleManager moduleManager, void prepare(ModuleManager moduleManager,
ApplicationConfiguration.ModuleConfiguration configuration) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException { ApplicationConfiguration.ModuleConfiguration configuration) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException, ModuleStartException {
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class); ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
boolean providerExist = false; boolean providerExist = false;
for (ModuleProvider provider : moduleProviderLoader) { for (ModuleProvider provider : moduleProviderLoader) {
......
...@@ -66,7 +66,7 @@ public abstract class ModuleProvider { ...@@ -66,7 +66,7 @@ public abstract class ModuleProvider {
/** /**
* In prepare stage, the module should initialize things which are irrelative other modules. * In prepare stage, the module should initialize things which are irrelative other modules.
*/ */
public abstract void prepare() throws ServiceNotProvidedException; public abstract void prepare() throws ServiceNotProvidedException, ModuleStartException;
/** /**
* In start stage, the module has been ready for interop. * In start stage, the module has been ready for interop.
...@@ -76,7 +76,7 @@ public abstract class ModuleProvider { ...@@ -76,7 +76,7 @@ public abstract class ModuleProvider {
/** /**
* This callback executes after all modules start up successfully. * This callback executes after all modules start up successfully.
*/ */
public abstract void notifyAfterCompleted() throws ServiceNotProvidedException; public abstract void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException;
/** /**
* @return module names which does this module require? * @return module names which does this module require?
......
...@@ -48,7 +48,6 @@ public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfigur ...@@ -48,7 +48,6 @@ public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfigur
@Override public ApplicationConfiguration load() throws ConfigFileNotFoundException { @Override public ApplicationConfiguration load() throws ConfigFileNotFoundException {
ApplicationConfiguration configuration = new ApplicationConfiguration(); ApplicationConfiguration configuration = new ApplicationConfiguration();
this.loadConfig(configuration); this.loadConfig(configuration);
this.loadDefaultConfig(configuration);
this.overrideConfigBySystemEnv(configuration); this.overrideConfigBySystemEnv(configuration);
return configuration; return configuration;
} }
...@@ -84,31 +83,6 @@ public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfigur ...@@ -84,31 +83,6 @@ public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfigur
} }
} }
@SuppressWarnings("unchecked")
private void loadDefaultConfig(ApplicationConfiguration configuration) throws ConfigFileNotFoundException {
try {
Reader applicationReader = ResourceUtils.read("application-default.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (!configuration.has(moduleName)) {
logger.warn("The {} module did't define in application.yml, use default", moduleName);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach(properties::put);
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
}
} catch (FileNotFoundException e) {
throw new ConfigFileNotFoundException(e.getMessage(), e);
}
}
private void overrideConfigBySystemEnv(ApplicationConfiguration configuration) { private void overrideConfigBySystemEnv(ApplicationConfiguration configuration) {
for (Map.Entry<Object, Object> prop : System.getProperties().entrySet()) { for (Map.Entry<Object, Object> prop : System.getProperties().entrySet()) {
overrideModuleSettings(configuration, prop.getKey().toString(), prop.getValue().toString()); overrideModuleSettings(configuration, prop.getKey().toString(), prop.getValue().toString());
......
...@@ -15,17 +15,12 @@ ...@@ -15,17 +15,12 @@
# limitations under the License. # limitations under the License.
cluster: cluster:
zookeeper: standalone:
hostPort: localhost:2181 # zookeeper:
# Retry Policy # hostPort: localhost:2181
baseSleepTimeMs: 1000 # initial amount of time to wait between retries # # Retry Policy
maxRetries: 3 # max number of times to retry # baseSleepTimeMs: 1000 # initial amount of time to wait between retries
#naming: # maxRetries: 3 # max number of times to retry
# jetty:
# #OS real network IP(binding required), for agent to find collector cluster
# host: localhost
# port: 10800
# contextPath: /
core: core:
default: default:
restHost: localhost restHost: localhost
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册