提交 a6ca67f6 编写于 作者: P pengys5

main test

上级 29fcbc04
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>skywalking-collector-cluster</module>
<module>skywalking-collector-worker</module>
</modules>
<parent>
<artifactId>skywalking</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<artifactId>skywalking-collector</artifactId>
<packaging>pom</packaging>
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>skywalking-collector-cluster</module>
<module>skywalking-collector-worker</module>
</modules>
<parent>
<artifactId>skywalking</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<artifactId>skywalking-collector</artifactId>
<packaging>pom</packaging>
<properties>
<akka.version>2.4.17</akka.version>
</properties>
<properties>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-metrics_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.9</version>
</dependency>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-metrics_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-impl-log4j2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -29,7 +29,7 @@ public abstract class AbstractMember<T> {
}
public abstract void preStart() throws Throwable;
public abstract void preStart() throws Exception;
/**
* Receive the message to analyse.
......
......@@ -18,6 +18,7 @@ public abstract class AbstractMemberProvider {
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{MemberSystem.class, ActorRef.class});
memberConstructor.setAccessible(true);
AbstractMember member = (AbstractMember) memberConstructor.newInstance(system, actorRef);
member.preStart();
system.memberOf(member, roleName());
}
......
package com.a.eye.skywalking.collector.actor;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
......@@ -8,6 +9,9 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import scala.Option;
import java.util.List;
......@@ -37,8 +41,22 @@ import java.util.List;
*/
public abstract class AbstractWorker<T> extends UntypedActor {
private Logger logger = LogManager.getFormatterLogger(AbstractWorker.class);
private MemberSystem memberSystem = new MemberSystem();
@Override
public void preStart() throws Exception {
super.preStart();
register();
}
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception {
super.preRestart(reason, message);
register();
}
/**
* Receive the message to analyse.
*
......@@ -55,6 +73,7 @@ public abstract class AbstractWorker<T> extends UntypedActor {
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ClusterEvent.CurrentClusterState) {
logger.info("receive ClusterEvent.CurrentClusterState message");
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
......@@ -62,9 +81,11 @@ public abstract class AbstractWorker<T> extends UntypedActor {
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
logger.info("receive ClusterEvent.MemberUp message");
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
register(memberUp.member());
} else {
} else {
logger.info("message class: %s", message.getClass().getName());
receive(message);
}
}
......@@ -89,12 +110,18 @@ public abstract class AbstractWorker<T> extends UntypedActor {
* @param member is the new created or restart worker
*/
void register(Member member) {
System.out.println("register");
if (member.getRoles().equals(WorkersListener.WorkName)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
}
}
void register() {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName());
getContext().actorSelection("/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
}
public MemberSystem memberContext() {
return memberSystem;
}
......
......@@ -2,6 +2,8 @@ package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The <code>AbstractWorkerProvider</code> should be implemented by any class whose
......@@ -27,6 +29,8 @@ import akka.actor.Props;
*/
public abstract class AbstractWorkerProvider<T> {
private Logger logger = LogManager.getFormatterLogger(AbstractWorkerProvider.class);
public abstract Class workerClass();
public abstract int workerNum();
......@@ -41,6 +45,7 @@ public abstract class AbstractWorkerProvider<T> {
for (int i = 1; i <= workerNum(); i++) {
system.actorOf(Props.create(workerClass()), roleName() + "_" + i);
logger.info("create akka actor, actor id is %s", roleName() + "_" + i);
}
}
......
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ServiceLoader;
......@@ -13,14 +17,19 @@ import java.util.ServiceLoader;
public enum WorkersCreator {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(WorkersCreator.class);
/**
* create worker to use Java Spi.
*
* @param system is create by akka {@link ActorSystem}
*/
public void boot(ActorSystem system) {
system.actorOf(Props.create(WorkersListener.class), WorkersListener.WorkName);
ServiceLoader<AbstractWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractWorkerProvider.class);
for (AbstractWorkerProvider provider : clusterServiceLoader) {
logger.info("create worker {%s} using java service loader", provider.workerClass().getName());
provider.createWorker(system);
}
}
......
......@@ -3,6 +3,8 @@ package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* <code>WorkersListener</code> listening the register message from workers
......@@ -18,6 +20,8 @@ import akka.actor.UntypedActor;
*/
public class WorkersListener extends UntypedActor {
private Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
public static final String WorkName = "WorkersListener";
@Override
......@@ -27,6 +31,8 @@ public class WorkersListener extends UntypedActor {
ActorRef sender = getSender();
getContext().watch(sender);
logger.info("register worker of role %s", register.getWorkRole());
WorkersRefCenter.INSTANCE.register(sender, register.getWorkRole());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
......
......@@ -28,5 +28,11 @@
<artifactId>transport</artifactId>
<version>5.2.2</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.actor.WorkersCreator;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.NoAvailableWorkerException;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
/**
* @author pengys5
*/
public class CollectorBootStartUp {
public static void main(String[] args) {
// ActorSystem system = ActorSystem.create("ClusterSystem", config);
// system.actorOf(Props.create(TraceConsumerActor.class), Const.Trace_Consumer_Role);
public static void main(String[] args) throws NoAvailableWorkerException, InterruptedException {
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port).
withFallback(ConfigFactory.parseString("akka.cluster.roles = [" + ClusterConfig.Cluster.Current.roles + "]")).
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
WorkersCreator.INSTANCE.boot(system);
}
}
......@@ -13,18 +13,23 @@ import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSum
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ApplicationMember extends AbstractMember {
private Logger logger = LogManager.getFormatterLogger(ApplicationMember.class);
public ApplicationMember(MemberSystem memberSystem, ActorRef actorRef) {
super(memberSystem, actorRef);
}
@Override
public void preStart() throws Throwable {
public void preStart() throws Exception {
logger.info("create members");
TraceSegmentRecordMember.Factory factory = new TraceSegmentRecordMember.Factory();
factory.createWorker(memberContext(), getSelf());
}
......@@ -32,6 +37,7 @@ public class ApplicationMember extends AbstractMember {
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
logger.debug("begin translate TraceSegment Object to JsonObject");
TraceSegment traceSegment = (TraceSegment) message;
AbstractMember discoverMember = memberContext().memberFor(TraceSegmentRecordMember.class.getSimpleName());
discoverMember.receive(traceSegment);
......@@ -67,11 +73,13 @@ public class ApplicationMember extends AbstractMember {
}
private void sendToNodeInstancePersistence(TraceSegment traceSegment) throws Throwable {
String code = traceSegment.getPrimaryRef().getApplicationCode();
String address = traceSegment.getPrimaryRef().getPeerHost();
if (traceSegment.getPrimaryRef() != null) {
String code = traceSegment.getPrimaryRef().getApplicationCode();
String address = traceSegment.getPrimaryRef().getPeerHost();
NodeInstancePersistence.Metric property = new NodeInstancePersistence.Metric(code, address);
tell(new NodeInstancePersistence.Factory(), RollingSelector.INSTANCE, property);
NodeInstancePersistence.Metric property = new NodeInstancePersistence.Metric(code, address);
tell(new NodeInstancePersistence.Factory(), RollingSelector.INSTANCE, property);
}
}
private void sendToResponseCostPersistence(TraceSegment traceSegment) throws Throwable {
......
......@@ -21,12 +21,12 @@ import java.util.Map;
*/
public class TraceSegmentRecordMember extends AbstractMember {
public TraceSegmentRecordMember(MemberSystem memberSystem, ActorRef actorRef) {
public TraceSegmentRecordMember(MemberSystem memberSystem, ActorRef actorRef) throws Throwable {
super(memberSystem, actorRef);
}
@Override
public void preStart() throws Throwable {
public void preStart() throws Exception {
}
@Override
......@@ -53,11 +53,15 @@ public class TraceSegmentRecordMember extends AbstractMember {
traceJsonObj.addProperty("endTime", traceSegment.getEndTime());
traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode());
JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef());
traceJsonObj.add("primaryRef", primaryRefJsonObj);
if (traceSegment.getPrimaryRef() != null) {
JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef());
traceJsonObj.add("primaryRef", primaryRefJsonObj);
}
JsonArray refsJsonArray = parseRefs(traceSegment.getRefs());
traceJsonObj.add("refs", refsJsonArray);
// if (traceSegment.getRefs() != null) {
// JsonArray refsJsonArray = parseRefs(traceSegment.getRefs());
// traceJsonObj.add("refs", refsJsonArray);
// }
JsonArray spanJsonArray = new JsonArray();
for (Span span : traceSegment.getSpans()) {
......
......@@ -13,12 +13,12 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class ApplicationRefMember extends AbstractMember {
public ApplicationRefMember(MemberSystem memberSystem, ActorRef actorRef) {
public ApplicationRefMember(MemberSystem memberSystem, ActorRef actorRef) throws Throwable {
super(memberSystem, actorRef);
}
@Override
public void preStart() throws Throwable {
public void preStart() {
}
......
package com.a.eye.skywalking.collector.worker.metric;
import akka.actor.UntypedActor;
/**
* @author pengys5
*/
public class TraceSegmentRelationActor extends UntypedActor{
@Override
public void onReceive(Object message) throws Throwable {
}
}
......@@ -7,22 +7,28 @@ import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.ApplicationMember;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMember;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class TraceSegmentReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class);
@Override
public void preStart() throws Exception {
ApplicationMember.Factory factory = new ApplicationMember.Factory();
factory.createWorker(memberContext(), getSelf());
super.preStart();
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId());
AbstractMember applicationMember = memberContext().memberFor(ApplicationMember.class.getSimpleName());
applicationMember.receive(traceSegment);
......@@ -32,7 +38,7 @@ public class TraceSegmentReceiver extends AbstractWorker {
}
}
public class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return TraceSegmentReceiver.class;
......
com.a.eye.skywalking.collector.actor.SpiTestWorkerFactory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver$Factory
com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence$Factory
com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence$Factory
com.a.eye.skywalking.collector.worker.applicationref.presistence.DAGNodeRefPersistence$Factory
\ No newline at end of file
cluster.current.hostname = 192.168.0.1
cluster.current.hostname = 127.0.0.1
cluster.current.port = 1000
cluster.current.roles = [Test, Test1]
cluster.current.roles = [TraceSegmentReceiver]
cluster.nodes = [192.168.0.1:1000, 192.168.0.2:1000]
\ No newline at end of file
cluster.nodes = ["akka.tcp://CollectorSystem@127.0.0.1:1000"]
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n" />
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import com.a.eye.skywalking.collector.actor.WorkersCreator;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import java.util.List;
/**
* @author pengys5
*/
public class StartUpTestCase {
@Test
public void test() throws Exception {
ClusterConfigInitializer.initialize("collector.config");
System.out.println(ClusterConfig.Cluster.Current.roles);
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.actor.provider=" + ClusterConfig.Cluster.provider)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)).
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
WorkersCreator.INSTANCE.boot(system);
Thread.sleep(2000);
for (int i = 0; i < 1; i++) {
TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace();
List<WorkerRef> availableWorks = WorkersRefCenter.INSTANCE.availableWorks(TraceSegmentReceiver.class.getSimpleName());
WorkerRef workerRef = RollingSelector.INSTANCE.select(availableWorks, traceSegment);
ActorRef actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRef);
actorRef.tell(traceSegment, ActorRef.noSender());
}
Thread.sleep(10000);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册