提交 1a4a137f 编写于 作者: P pengys5

collector cluster

上级 dd5ce743
......@@ -5,6 +5,8 @@
<modules>
<module>skywalking-collector-cluster</module>
<module>skywalking-collector-worker</module>
<module>../skywalking-collector-actor</module>
<module>skywalking-collector-actor</module>
</modules>
<parent>
<artifactId>skywalking</artifactId>
......@@ -45,6 +47,11 @@
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>skywalking-collector</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-collector-actor</artifactId>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.api.util.StringUtil;
/**
* @author pengys5
*/
public abstract class AbstractWorkerProvider {
public abstract String workerName();
public abstract Class workerClass();
public abstract int workerNum();
public void createWorker(ActorSystem system) {
if (StringUtil.isEmpty(workerName())) {
throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerName()");
}
if (workerClass() == null) {
throw new IllegalArgumentException("cannot createWorker() with anything not obtained from workerClass()");
}
if (workerNum() <= 0) {
throw new IllegalArgumentException("cannot workerNum() with obtained from workerNum() must greater than 0");
}
for (int i = 1; i <= workerNum(); i++) {
system.actorOf(Props.create(workerClass()), workerName() + "_" + i);
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
/**
* @author pengys5
*/
public class CollectorBoot {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("ClusterSystem", config);
}
}
package com.a.eye.skywalking.collector.cluster.config;
package com.a.eye.skywalking.collector.actor;
/**
* Created by pengys5 on 2017/2/22 0022.
......@@ -12,8 +12,8 @@ public class CollectorConfig {
public static String port = "2551";
public static String cluster = "127.0.0.1:2551";
public static class Actor {
public static int ActorManagerActor_Num = 2;
public static class Worker {
public static int ApplicationDiscoverMetric_Num = 2;
}
}
}
package com.a.eye.skywalking.collector.cluster.config;
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.api.logging.api.ILog;
import com.a.eye.skywalking.api.logging.api.LogManager;
......@@ -9,7 +9,7 @@ import java.io.InputStream;
import java.util.Properties;
/**
* Created by pengys5 on 2017/2/22 0022.
* @author pengys5
*/
public class CollectorConfigInitializer {
......
package com.a.eye.skywalking.collector.cluster.manager;
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import java.util.List;
/**
* Created by wusheng on 2017/2/21.
* @author wusheng
*/
public interface RefRouter {
ActorRef find(List<ActorRef> candidates);
......
package com.a.eye.skywalking.collector.cluster;
/**
* @author pengys5
*/
public class ClusterConfig {
public static class Cluster {
public static class Current {
public static String hostname = "127.0.0.1";
public static String port = "2551";
public static String roles = "";
}
public static String nodes = "127.0.0.1:2551";
public static final String appname = "CollectorSystem";
public static final String provider = "akka.cluster.ClusterActorRefProvider";
}
}
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.api.logging.api.ILog;
import com.a.eye.skywalking.api.logging.api.LogManager;
import com.a.eye.skywalking.api.util.ConfigInitializer;
import com.a.eye.skywalking.api.util.StringUtil;
import java.io.InputStream;
import java.util.Properties;
/**
* @author pengys5
*/
public class ClusterConfigInitializer {
private static ILog logger = LogManager.getLogger(ClusterConfigInitializer.class);
public static final String ConfigFileName = "collector.config";
public static void initialize(String configFileName) {
InputStream configFileStream = ClusterConfigInitializer.class.getResourceAsStream("/" + configFileName);
if (configFileStream == null) {
logger.info("Not provide sky-walking certification documents, sky-walking api run in default config.");
} else {
try {
Properties properties = new Properties();
properties.load(configFileStream);
ConfigInitializer.initialize(properties, ClusterConfig.class);
} catch (Exception e) {
logger.error("Failed to read the config file, sky-walking api run in default config.", e);
}
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.hostname"))) {
ClusterConfig.Cluster.Current.hostname = System.getProperty("cluster.current.hostname");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.port"))) {
ClusterConfig.Cluster.Current.port = System.getProperty("cluster.current.port");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.roles"))) {
ClusterConfig.Cluster.Current.roles = System.getProperty("cluster.current.roles");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.nodes"))) {
ClusterConfig.Cluster.nodes = System.getProperty("cluster.nodes");
}
}
}
package com.a.eye.skywalking.collector.cluster;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public class Const {
public static final String Actor_Manager_Path = "/user/" + Const.Actor_Manager_Role;
public static final String Actor_Manager_Role = "Actor_Manager_Role";
public static final String Trace_Producer_Role = "Trace_Producer_Role";
public static final String Trace_Consumer_Role = "Trace_Consumer_Role";
}
package com.a.eye.skywalking.collector.cluster;
import java.io.Serializable;
/**
* @author pengys5
*/
public class WorkerListenerMessage {
public static class RegisterMessage implements Serializable {
public final String role;
public RegisterMessage(String role) {
this.role = role;
}
public String getRole() {
return role;
}
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
/**
* @author pengys5
*/
public class WorkersListener extends UntypedActor {
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
ActorRef sender = getSender();
getContext().watch(sender);
WorkersRefCenter.INSTANCE.register(sender, register.getRole());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
WorkersRefCenter.INSTANCE.unregister(terminated.getActor());
} else {
unhandled(message);
}
}
}
package com.a.eye.skywalking.collector.cluster.manager;
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
......@@ -9,20 +9,20 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <code>ActorRefCenter</code> represent a cache center,
* <code>WorkersRefCenter</code> represent a cache center,
* store all {@link ActorRef}s, each of them represent a Akka Actor instance.
* All the Actors in this JVM, can find alive-actor in here, and send message.
*
* @author wusheng
*/
public enum ActorRefCenter {
public enum WorkersRefCenter {
INSTANCE;
private Map<String, List<ActorRef>> roleToActor = new ConcurrentHashMap();
private Map<ActorRef, String> actorToRole = new ConcurrentHashMap();
public void register(ActorRef newRef, String name){
public void register(ActorRef newRef, String name) {
if (!roleToActor.containsKey(name)) {
List<ActorRef> actorList = Collections.synchronizedList(new ArrayList<ActorRef>());
roleToActor.putIfAbsent(name, actorList);
......@@ -31,17 +31,17 @@ public enum ActorRefCenter {
actorToRole.put(newRef, name);
}
public void unregister(ActorRef newRef){
public void unregister(ActorRef newRef) {
String role = actorToRole.get(newRef);
roleToActor.get(role).remove(newRef);
actorToRole.remove(newRef);
}
public ActorRef find(String name, RefRouter router){
return router.find(roleToActor.get(name));
}
// public ActorRef find(String name, RefRouter router) {
// return router.find(roleToActor.get(name));
// }
public int sizeOf(String name){
public int sizeOf(String name) {
return roleToActor.get(name).size();
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.cluster.base;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
/**
* @author pengys5
*/
public abstract class AbstractUntypedActor extends UntypedActor {
}
package com.a.eye.skywalking.collector.cluster.base;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
/**
* @author pengys5
*/
public interface IActorProvider {
public String actorName();
public void createActor(ActorSystem system);
public void actorOf(ActorSystem system, String actorInClusterName);
}
package com.a.eye.skywalking.collector.cluster.consumer;
import akka.cluster.ClusterEvent;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationResult;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import org.springframework.context.annotation.Scope;
//@Named("TraceConsumerActor")
@Scope("prototype")
public class TraceConsumerActor extends UntypedActor {
Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
cluster.subscribe(getSelf(), ClusterEvent.MemberEvent.class);
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob) message;
getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf());
} else if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
register(mUp.member());
} else {
unhandled(message);
}
}
void register(Member member) {
System.out.println("register");
if (member.hasRole(Const.Trace_Producer_Role)) {
System.out.println("register: " + Const.Trace_Producer_Role);
ActorRegisterMessage.RegisterMessage registerMessage = new ActorRegisterMessage.RegisterMessage(Const.Trace_Consumer_Role, "");
getContext().actorSelection(member.address() + Const.Actor_Manager_Path).tell(registerMessage, getSelf());
}
}
}
package com.a.eye.skywalking.collector.cluster.consumer;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class TraceConsumerApp {
public static void main(String[] args) throws InterruptedException {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "2551";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterSystem", config);
// system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
system.actorOf(Props.create(TraceConsumerActor.class), Const.Trace_Consumer_Role);
}
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.cluster.base.IActorProvider;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ActorCreator {
INSTANCE;
public void create(ActorSystem system) {
ServiceLoader<IActorProvider> serviceLoader = ServiceLoader.load(IActorProvider.class);
for (IActorProvider service : serviceLoader) {
service.createActor(system);
}
}
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.Terminated;
import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor;
import com.a.eye.skywalking.collector.cluster.base.IActorProvider;
import com.a.eye.skywalking.collector.cluster.message.ActorRegisterMessage;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public class ActorManagerActor extends AbstractUntypedActor {
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ActorRegisterMessage.RegisterMessage) {
System.out.println("RegisterMessage");
ActorRegisterMessage.RegisterMessage regist = (ActorRegisterMessage.RegisterMessage) message;
getContext().watch(getSender());
ActorRefCenter.INSTANCE.register(getSender(), regist.getRole());
} else if (message instanceof Terminated) {
System.out.println("Terminated");
Terminated terminated = (Terminated) message;
ActorRefCenter.INSTANCE.unregister(terminated.getActor());
} else {
unhandled(message);
}
}
}
package com.a.eye.skywalking.collector.cluster.manager;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.cluster.base.IActorProvider;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
/**
* @author pengys5
*/
public class ActorManagerActorFactory implements IActorProvider {
@Override
public String actorName() {
return "ActorManagerActor";
}
@Override
public void createActor(ActorSystem system) {
for (int i = 1; i <= CollectorConfig.Collector.Actor.ActorManagerActor_Num; i++) {
actorOf(system, actorName() + "_" + i);
}
}
@Override
public void actorOf(ActorSystem system, String actorInClusterName) {
system.actorOf(Props.create(ActorManagerActor.class), actorInClusterName);
}
}
package com.a.eye.skywalking.collector.cluster.message;
import java.io.Serializable;
/**
* Created by Administrator on 2017/2/21 0021.
*/
public interface ActorRegisterMessage {
public static class RegisterMessage implements Serializable {
public final String role;
public final String action;
public RegisterMessage(String role, String action) {
this.role = role;
this.action = action;
}
public String getRole() {
return role;
}
public String getAction() {
return action;
}
}
public static class RegisteMessageResult implements Serializable{
public final String role;
public final Integer value;
public RegisteMessageResult(String role, Integer value){
this.role = role;
this.value = value;
}
public String getRole() {
return role;
}
public Integer getValue() {
return value;
}
}
}
package com.a.eye.skywalking.collector.cluster.message;
import com.a.eye.skywalking.trace.TraceSegment;
import java.io.Serializable;
//#messages
public interface TraceMessages {
public static class TransformationJob implements Serializable {
private final String text;
private final TraceSegment traceSegment;
public TransformationJob(String text, TraceSegment traceSegment) {
this.text = text;
this.traceSegment = traceSegment;
}
public String getText() {
return text;
}
public TraceSegment getTraceSegment() {
return traceSegment;
}
}
public static class TransformationResult implements Serializable {
private final String text;
public TransformationResult(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return "TransformationResult(" + text + ")";
}
}
public static class JobFailed implements Serializable {
private final String reason;
private final TransformationJob job;
public JobFailed(String reason, TransformationJob job) {
this.reason = reason;
this.job = job;
}
public String getReason() {
return reason;
}
public TransformationJob getJob() {
return job;
}
@Override
public String toString() {
return "JobFailed(" + reason + ")";
}
}
public static final String BACKEND_REGISTRATION = "BackendRegistration";
}
//#messages
\ No newline at end of file
package com.a.eye.skywalking.collector.cluster.producer;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.manager.ActorRefCenter;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.JobFailed;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import akka.actor.UntypedActor;
import org.springframework.context.annotation.Scope;
//#frontend
//@Named("TraceProducerActor")
@Scope("prototype")
public class TraceProducerActor extends UntypedActor {
int jobCounter = 0;
@Override
public void onReceive(Object message) {
int actorSize = ActorRefCenter.INSTANCE.sizeOf(Const.Trace_Consumer_Role);
if (actorSize == 0) {
System.out.println("actorList null");
} else {
System.out.println("sizeOf: " + actorSize);
}
if ((message instanceof TransformationJob) && actorSize == 0) {
TransformationJob job = (TransformationJob)message;
getSender().tell(new JobFailed("Service unavailable, try again later", job), getSender());
} else if (message instanceof TransformationJob) {
TransformationJob job = (TransformationJob)message;
jobCounter++;
ActorRefCenter.INSTANCE.find(Const.Trace_Consumer_Role,
(candidates) -> candidates.get(jobCounter % candidates.size()));
} else {
unhandled(message);
}
}
}
//#frontend
package com.a.eye.skywalking.collector.cluster.producer;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfigInitializer;
import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static akka.pattern.Patterns.ask;
/**
* {@link TraceProducerApp} is a producer for trace agent to send {@link TraceSegment}.
* <p>
* Created by pengys5 on 2017/2/17.
*/
public class TraceProducerApp {
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
final Config config = TraceProducerApp.buildConfig();
ActorSystem system = ActorSystem.create(CollectorConfig.appname, config);
system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
final ActorRef frontend = system.actorOf(Props.create(TraceProducerActor.class), Const.Trace_Producer_Role);
final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ExecutionContext ec = system.dispatcher();
final AtomicInteger counter = new AtomicInteger();
system.scheduler().schedule(interval, interval, () -> {
ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet(), null), timeout).onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
}
}, ec);
}, ec);
}
public static Config buildConfig() {
CollectorConfigInitializer.initialize();
Config config = ConfigFactory.parseString("akka.actor.provider = akka.cluster.ClusterActorRefProvider")
.withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname = " + CollectorConfig.Collector.hostname))
.withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port = " + CollectorConfig.Collector.port))
.withFallback(ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off"))
.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes = [" + TraceProducerApp.buildSeedNodes(CollectorConfig.Collector.cluster) + "]"))
.withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 10s"))
.withFallback(ConfigFactory.parseString("akka.cluster.roles = [Actor_Manager_Role, Trace_Producer_Role, Trace_Consumer_Role]"))
.withFallback(ConfigFactory.parseString("akka.cluster.metrics.enabled = off"));
// .withFallback(ConfigFactory.load());
return config;
}
public static String buildSeedNodes(String cluster) {
String[] clusters = cluster.split(",");
StringBuffer seedNodes = new StringBuffer();
for (int i = 0; i < clusters.length; i++) {
if (i > 0) {
seedNodes.append(",");
}
seedNodes.append("\"akka.tcp://").append(CollectorConfig.appname).append("@");
seedNodes.append(clusters[i]).append("\"");
}
return seedNodes.toString();
}
}
#//#snippet
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
#//#snippet
# excluded from snippet
auto-down-unreachable-after = 10s
#//#snippet
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
roles = [Actor_Manager_Role, Trace_Producer_Role, Trace_Consumer_Role]
# Disable legacy metrics in akka-cluster.
metrics.enabled=off
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.cluster.manager.ActorCreator;
import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
import org.junit.Test;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class ActorCreatorTestCase {
@Test
public void testCreate() {
ActorSystem system = mock(ActorSystem.class);
// ActorCreator.INSTANCE.create(system, ActorManagerActor.class, 1);
// verify(system).actorOf(Props.create(ActorManagerActor.class), "ActorManagerActor");
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActorFactory;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
/**
* @author pengys5
*/
public class ActorProviderTestCase {
@Test
public void testActorName() {
ActorManagerActorFactory factory = new ActorManagerActorFactory();
String actorName = factory.actorName();
Assert.assertEquals("ActorManagerActor", actorName);
}
@Test
public void testCreateActor() {
ActorSystem system = Mockito.mock(ActorSystem.class);
ActorManagerActorFactory factory = new ActorManagerActorFactory();
factory.createActor(system);
}
}
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfigInitializer;
import com.a.eye.skywalking.collector.cluster.producer.TraceProducerApp;
import com.typesafe.config.Config;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Created by pengys5 on 2017/2/22 0022.
* @author pengys5
*/
public class CollectorConfigTestCase {
@Test
public void testConfigInitializer() {
System.setProperty("collector.hostname", "192.168.0.1");
System.setProperty("collector.port", "1000");
System.setProperty("collector.cluster", "192.168.0.1:1000");
CollectorConfigInitializer.initialize();
Assert.assertEquals("192.168.0.1", CollectorConfig.Collector.hostname);
Assert.assertEquals("1000", CollectorConfig.Collector.port);
Assert.assertEquals("192.168.0.1:1000", CollectorConfig.Collector.cluster);
@Before
public void resetArguments() {
System.clearProperty("cluster.current.hostname");
System.clearProperty("cluster.current.port");
System.clearProperty("cluster.current.roles");
System.clearProperty("cluster.nodes");
}
@Test
public void testBuildSeedNodes() {
String seedNodesContainOne = TraceProducerApp.buildSeedNodes("192.168.0.1:1000");
Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1000\"", seedNodesContainOne);
String seedNodesContainTwo = TraceProducerApp.buildSeedNodes("192.168.0.1:1001,192.168.0.2:1002");
Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1001\",\"akka.tcp://CollectorSystem@192.168.0.2:1002\"", seedNodesContainTwo);
String seedNodesContainThree = TraceProducerApp.buildSeedNodes("192.168.0.1:1001,192.168.0.2:1002,192.168.0.3:1003");
Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1001\",\"akka.tcp://CollectorSystem@192.168.0.2:1002\",\"akka.tcp://CollectorSystem@192.168.0.3:1003\"", seedNodesContainThree);
public void testInitializeUseConfigFile() {
ClusterConfigInitializer.initialize("collector.config");
Assert.assertEquals("192.168.0.1", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("[Test, Test1]", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("[192.168.0.1:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes);
}
@Test
public void testBuildConfig() {
Config config = TraceProducerApp.buildConfig();
Assert.assertEquals("akka.cluster.ClusterActorRefProvider", config.getString("akka.actor.provider"));
Assert.assertEquals("10s", config.getString("akka.cluster.auto-down-unreachable-after"));
Assert.assertEquals("off", config.getString("akka.cluster.metrics.enabled"));
Assert.assertEquals("off", config.getString("akka.remote.log-remote-lifecycle-events"));
Assert.assertEquals("127.0.0.1", config.getString("akka.remote.netty.tcp.hostname"));
Assert.assertEquals("2551", config.getString("akka.remote.netty.tcp.port"));
String[] roles = {"Actor_Manager_Role", "Trace_Producer_Role", "Trace_Consumer_Role"};
Assert.assertArrayEquals(roles, config.getStringList("akka.cluster.roles").toArray());
public void testInitializeUseArguments() {
System.setProperty("cluster.current.hostname", "192.168.0.2");
System.setProperty("cluster.current.port", "1001");
System.setProperty("cluster.current.roles", "Test3, Test4");
System.setProperty("cluster.nodes", "[192.168.0.2:1000, 192.168.0.2:1000]");
ClusterConfigInitializer.initialize("collector.config");
Assert.assertEquals("192.168.0.2", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("Test3, Test4", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("[192.168.0.2:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes);
}
}
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import org.junit.Test;
/**
* Created by pengys5 on 2017/2/22 0022.
*/
public class TraceSegmentTestCase {
@Test
public void testProducerSend() {
TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace();
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import scala.concurrent.Future;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class WorkerListenerTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
MemberModifier.field(WorkersRefCenter.class, "actorToRole").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
MemberModifier.field(WorkersRefCenter.class, "roleToActor").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
}
@Test
public void testRegister() throws IllegalAccessException {
final Props props = Props.create(WorkersListener.class);
final TestActorRef<WorkersListener> senderActorRef = TestActorRef.create(system, props, "WorkersListenerSender");
final TestActorRef<WorkersListener> receiveactorRef = TestActorRef.create(system, props, "WorkersListenerReceive");
WorkerListenerMessage.RegisterMessage message = new WorkerListenerMessage.RegisterMessage("WorkersListener");
receiveactorRef.tell(message, senderActorRef);
Map<ActorRef, String> actorToRole = (Map<ActorRef, String>) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE);
Assert.assertEquals("WorkersListener", actorToRole.get(senderActorRef));
Map<String, List<ActorRef>> roleToActor = (Map<String, List<ActorRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE);
ActorRef[] actorRefs = {senderActorRef};
Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray());
}
@Test
public void testTerminated() throws IllegalAccessException {
final Props props = Props.create(WorkersListener.class);
final TestActorRef<WorkersListener> senderActorRef = TestActorRef.create(system, props, "WorkersListenerSender");
final TestActorRef<WorkersListener> receiveactorRef = TestActorRef.create(system, props, "WorkersListenerReceive");
WorkerListenerMessage.RegisterMessage message = new WorkerListenerMessage.RegisterMessage("WorkersListener");
receiveactorRef.tell(message, senderActorRef);
senderActorRef.stop();
Map<ActorRef, String> actorToRole = (Map<ActorRef, String>) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE);
Assert.assertEquals(null, actorToRole.get(senderActorRef));
Map<String, List<ActorRef>> roleToActor = (Map<String, List<ActorRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE);
ActorRef[] actorRefs = {};
Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray());
}
@Test
public void testUnhandled() {
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class WorkersRefCenterTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
MemberModifier.field(WorkersRefCenter.class, "actorToRole").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
MemberModifier.field(WorkersRefCenter.class, "roleToActor").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
}
@Test
public void testRegister() throws IllegalAccessException {
final Props props = Props.create(WorkersListener.class);
final TestActorRef<WorkersListener> actorRef1 = TestActorRef.create(system, props, "WorkersListener1");
final TestActorRef<WorkersListener> actorRef2 = TestActorRef.create(system, props, "WorkersListener2");
final TestActorRef<WorkersListener> actorRef3 = TestActorRef.create(system, props, "WorkersListener3");
WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener");
Map<ActorRef, String> actorToRole = (Map<ActorRef, String>) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE);
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef1));
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef2));
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef3));
Map<String, List<ActorRef>> roleToActor = (Map<String, List<ActorRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE);
ActorRef[] actorRefs = {actorRef1, actorRef2, actorRef3};
Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray());
}
@Test
public void testUnRegister() throws IllegalAccessException {
final Props props = Props.create(WorkersListener.class);
final TestActorRef<WorkersListener> actorRef1 = TestActorRef.create(system, props, "WorkersListener1");
final TestActorRef<WorkersListener> actorRef2 = TestActorRef.create(system, props, "WorkersListener2");
final TestActorRef<WorkersListener> actorRef3 = TestActorRef.create(system, props, "WorkersListener3");
WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener");
Map<ActorRef, String> actorToRole = (Map<ActorRef, String>) MemberModifier.field(WorkersRefCenter.class, "actorToRole").get(WorkersRefCenter.INSTANCE);
Map<String, List<ActorRef>> roleToActor = (Map<String, List<ActorRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToActor").get(WorkersRefCenter.INSTANCE);
WorkersRefCenter.INSTANCE.unregister(actorRef1);
Assert.assertEquals(null, actorToRole.get(actorRef1));
ActorRef[] actorRefs = {actorRef2, actorRef3};
Assert.assertArrayEquals(actorRefs, roleToActor.get("WorkersListener").toArray());
}
@Test
public void testSizeOf(){
final Props props = Props.create(WorkersListener.class);
final TestActorRef<WorkersListener> actorRef1 = TestActorRef.create(system, props, "WorkersListener1");
final TestActorRef<WorkersListener> actorRef2 = TestActorRef.create(system, props, "WorkersListener2");
final TestActorRef<WorkersListener> actorRef3 = TestActorRef.create(system, props, "WorkersListener3");
WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener");
Assert.assertEquals(3, WorkersRefCenter.INSTANCE.sizeOf("WorkersListener"));
}
}
cluster.current.hostname = 192.168.0.1
cluster.current.port = 1000
cluster.current.roles = [Test, Test1]
cluster.nodes = [192.168.0.1:1000, 192.168.0.2:1000]
\ No newline at end of file
......@@ -14,7 +14,7 @@
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-collector-cluster</artifactId>
<artifactId>skywalking-collector-actor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
package com.a.eye.skywalking.collector.worker.metric;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
/**
* @author pengys5
*/
public class ApplicationDiscoerWorkerFactory extends AbstractWorkerProvider {
public static final String WorkerName = "ApplicationDiscoverMetric";
@Override
public String workerName() {
return WorkerName;
}
@Override
public Class workerClass() {
return ApplicationDiscoverMetric.class;
}
@Override
public int workerNum() {
return CollectorConfig.Collector.Worker.ApplicationDiscoverMetric_Num;
}
}
package com.a.eye.skywalking.collector.worker.indicator;
package com.a.eye.skywalking.collector.worker.metric;
import akka.actor.UntypedActor;
import com.a.eye.skywalking.collector.cluster.base.AbstractUntypedActor;
/**
* @author pengys5
*/
public class ApplicationDiscoverActor extends AbstractUntypedActor {
public static final String ActorName = "ApplicationDiscoverActor";
@Override
public String actorName() {
return ActorName;
}
public class ApplicationDiscoverMetric extends AbstractUntypedActor {
@Override
public void onReceive(Object message) throws Throwable {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册