From a6ca67f6bf08d26f494cd56ca8d697ca25897c38 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Fri, 3 Mar 2017 20:14:29 +0800 Subject: [PATCH] main test --- skywalking-collector/pom.xml | 120 ++++++++++-------- .../collector/actor/AbstractMember.java | 2 +- .../actor/AbstractMemberProvider.java | 1 + .../collector/actor/AbstractWorker.java | 29 ++++- .../actor/AbstractWorkerProvider.java | 5 + .../collector/actor/WorkersCreator.java | 9 ++ .../collector/cluster/WorkersListener.java | 6 + .../skywalking-collector-worker/pom.xml | 6 + .../worker/CollectorBootStartUp.java | 21 ++- .../worker/application/ApplicationMember.java | 18 ++- .../metric/TraceSegmentRecordMember.java | 16 ++- .../applicationref/ApplicationRefMember.java | 4 +- .../metric/TraceSegmentRelationActor.java | 13 -- .../worker/receiver/TraceSegmentReceiver.java | 8 +- ...ing.collector.actor.AbstractWorkerProvider | 9 +- .../src/main/resources/collector.config | 6 +- .../src/main/resources/log4j2.xml | 13 ++ .../collector/worker/StartUpTestCase.java | 54 ++++++++ 18 files changed, 249 insertions(+), 91 deletions(-) delete mode 100644 skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/TraceSegmentRelationActor.java create mode 100644 skywalking-collector/skywalking-collector-worker/src/main/resources/log4j2.xml create mode 100644 skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java diff --git a/skywalking-collector/pom.xml b/skywalking-collector/pom.xml index 0b7ddef07..2aaca88e7 100644 --- a/skywalking-collector/pom.xml +++ b/skywalking-collector/pom.xml @@ -1,61 +1,71 @@ - 4.0.0 - - skywalking-collector-cluster - skywalking-collector-worker - - - skywalking - com.a.eye - 3.0-2017 - - skywalking-collector - pom + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + 4.0.0 + + skywalking-collector-cluster + skywalking-collector-worker + + + skywalking + com.a.eye + 3.0-2017 + + skywalking-collector + pom - - 2.4.17 - + + 2.4.17 + - - - com.typesafe.akka - akka-cluster_2.11 - ${akka.version} - - - com.typesafe.akka - akka-cluster-metrics_2.11 - ${akka.version} - - - com.typesafe.akka - akka-cluster-tools_2.11 - ${akka.version} - - - com.typesafe.akka - akka-persistence_2.11 - ${akka.version} - - - org.iq80.leveldb - leveldb - 0.9 - + + + com.typesafe.akka + akka-cluster_2.11 + ${akka.version} + + + com.typesafe.akka + akka-cluster-metrics_2.11 + ${akka.version} + + + com.typesafe.akka + akka-cluster-tools_2.11 + ${akka.version} + + + com.typesafe.akka + akka-persistence_2.11 + ${akka.version} + + + org.iq80.leveldb + leveldb + 0.9 + + + com.a.eye + skywalking-logging-api + ${project.version} + + + com.a.eye + skywalking-logging-impl-log4j2 + ${project.version} + - - com.typesafe.akka - akka-testkit_2.11 - ${akka.version} - test - - - com.a.eye - skywalking-sniffer-mock - ${project.version} - test - - + + com.typesafe.akka + akka-testkit_2.11 + ${akka.version} + test + + + com.a.eye + skywalking-sniffer-mock + ${project.version} + test + + diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java index 9c6f743ba..8514ebeaf 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java @@ -29,7 +29,7 @@ public abstract class AbstractMember { } - public abstract void preStart() throws Throwable; + public abstract void preStart() throws Exception; /** * Receive the message to analyse. diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java index 9f47d8d35..16c5041e2 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java @@ -18,6 +18,7 @@ public abstract class AbstractMemberProvider { Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{MemberSystem.class, ActorRef.class}); memberConstructor.setAccessible(true); AbstractMember member = (AbstractMember) memberConstructor.newInstance(system, actorRef); + member.preStart(); system.memberOf(member, roleName()); } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java index f98072a44..c11fb494c 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java @@ -1,5 +1,6 @@ package com.a.eye.skywalking.collector.actor; +import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.cluster.ClusterEvent; import akka.cluster.Member; @@ -8,6 +9,9 @@ import com.a.eye.skywalking.collector.actor.selector.WorkerSelector; import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage; import com.a.eye.skywalking.collector.cluster.WorkersListener; import com.a.eye.skywalking.collector.cluster.WorkersRefCenter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import scala.Option; import java.util.List; @@ -37,8 +41,22 @@ import java.util.List; */ public abstract class AbstractWorker extends UntypedActor { + private Logger logger = LogManager.getFormatterLogger(AbstractWorker.class); + private MemberSystem memberSystem = new MemberSystem(); + @Override + public void preStart() throws Exception { + super.preStart(); + register(); + } + + @Override + public void preRestart(Throwable reason, Option message) throws Exception { + super.preRestart(reason, message); + register(); + } + /** * Receive the message to analyse. * @@ -55,6 +73,7 @@ public abstract class AbstractWorker extends UntypedActor { @Override public void onReceive(Object message) throws Throwable { if (message instanceof ClusterEvent.CurrentClusterState) { + logger.info("receive ClusterEvent.CurrentClusterState message"); ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message; for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { @@ -62,9 +81,11 @@ public abstract class AbstractWorker extends UntypedActor { } } } else if (message instanceof ClusterEvent.MemberUp) { + logger.info("receive ClusterEvent.MemberUp message"); ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message; register(memberUp.member()); - } else { + } else { + logger.info("message class: %s", message.getClass().getName()); receive(message); } } @@ -89,12 +110,18 @@ public abstract class AbstractWorker extends UntypedActor { * @param member is the new created or restart worker */ void register(Member member) { + System.out.println("register"); if (member.getRoles().equals(WorkersListener.WorkName)) { WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName()); getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf()); } } + void register() { + WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName()); + getContext().actorSelection("/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf()); + } + public MemberSystem memberContext() { return memberSystem; } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java index 17e58f012..d3927754f 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorkerProvider.java @@ -2,6 +2,8 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.ActorSystem; import akka.actor.Props; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * The AbstractWorkerProvider should be implemented by any class whose @@ -27,6 +29,8 @@ import akka.actor.Props; */ public abstract class AbstractWorkerProvider { + private Logger logger = LogManager.getFormatterLogger(AbstractWorkerProvider.class); + public abstract Class workerClass(); public abstract int workerNum(); @@ -41,6 +45,7 @@ public abstract class AbstractWorkerProvider { for (int i = 1; i <= workerNum(); i++) { system.actorOf(Props.create(workerClass()), roleName() + "_" + i); + logger.info("create akka actor, actor id is %s", roleName() + "_" + i); } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java index 9a063d95d..c4b88371c 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/WorkersCreator.java @@ -1,6 +1,10 @@ package com.a.eye.skywalking.collector.actor; import akka.actor.ActorSystem; +import akka.actor.Props; +import com.a.eye.skywalking.collector.cluster.WorkersListener; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.ServiceLoader; @@ -13,14 +17,19 @@ import java.util.ServiceLoader; public enum WorkersCreator { INSTANCE; + private Logger logger = LogManager.getFormatterLogger(WorkersCreator.class); + /** * create worker to use Java Spi. * * @param system is create by akka {@link ActorSystem} */ public void boot(ActorSystem system) { + system.actorOf(Props.create(WorkersListener.class), WorkersListener.WorkName); + ServiceLoader clusterServiceLoader = ServiceLoader.load(AbstractWorkerProvider.class); for (AbstractWorkerProvider provider : clusterServiceLoader) { + logger.info("create worker {%s} using java service loader", provider.workerClass().getName()); provider.createWorker(system); } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java index 29fa694ab..b7e0fbb18 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/cluster/WorkersListener.java @@ -3,6 +3,8 @@ package com.a.eye.skywalking.collector.cluster; import akka.actor.ActorRef; import akka.actor.Terminated; import akka.actor.UntypedActor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * WorkersListener listening the register message from workers @@ -18,6 +20,8 @@ import akka.actor.UntypedActor; */ public class WorkersListener extends UntypedActor { + private Logger logger = LogManager.getFormatterLogger(WorkersListener.class); + public static final String WorkName = "WorkersListener"; @Override @@ -27,6 +31,8 @@ public class WorkersListener extends UntypedActor { ActorRef sender = getSender(); getContext().watch(sender); + logger.info("register worker of role %s", register.getWorkRole()); + WorkersRefCenter.INSTANCE.register(sender, register.getWorkRole()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; diff --git a/skywalking-collector/skywalking-collector-worker/pom.xml b/skywalking-collector/skywalking-collector-worker/pom.xml index cd53c5494..15ff30c96 100644 --- a/skywalking-collector/skywalking-collector-worker/pom.xml +++ b/skywalking-collector/skywalking-collector-worker/pom.xml @@ -28,5 +28,11 @@ transport 5.2.2 + + com.a.eye + skywalking-sniffer-mock + ${project.version} + test + \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java index e8baddfdc..be13321e6 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/CollectorBootStartUp.java @@ -1,12 +1,27 @@ package com.a.eye.skywalking.collector.worker; +import akka.actor.ActorSystem; +import com.a.eye.skywalking.collector.actor.WorkersCreator; +import com.a.eye.skywalking.collector.cluster.ClusterConfig; +import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer; +import com.a.eye.skywalking.collector.cluster.NoAvailableWorkerException; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + /** * @author pengys5 */ public class CollectorBootStartUp { - public static void main(String[] args) { -// ActorSystem system = ActorSystem.create("ClusterSystem", config); -// system.actorOf(Props.create(TraceConsumerActor.class), Const.Trace_Consumer_Role); + public static void main(String[] args) throws NoAvailableWorkerException, InterruptedException { + ClusterConfigInitializer.initialize("collector.config"); + + final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port). + withFallback(ConfigFactory.parseString("akka.cluster.roles = [" + ClusterConfig.Cluster.Current.roles + "]")). + withFallback(ConfigFactory.load()); + + + ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config); + WorkersCreator.INSTANCE.boot(system); } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java index 5caae1fbd..d2222f016 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java @@ -13,18 +13,23 @@ import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSum import com.a.eye.skywalking.trace.Span; import com.a.eye.skywalking.trace.TraceSegment; import com.a.eye.skywalking.trace.tag.Tags; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * @author pengys5 */ public class ApplicationMember extends AbstractMember { + private Logger logger = LogManager.getFormatterLogger(ApplicationMember.class); + public ApplicationMember(MemberSystem memberSystem, ActorRef actorRef) { super(memberSystem, actorRef); } @Override - public void preStart() throws Throwable { + public void preStart() throws Exception { + logger.info("create members"); TraceSegmentRecordMember.Factory factory = new TraceSegmentRecordMember.Factory(); factory.createWorker(memberContext(), getSelf()); } @@ -32,6 +37,7 @@ public class ApplicationMember extends AbstractMember { @Override public void receive(Object message) throws Throwable { if (message instanceof TraceSegment) { + logger.debug("begin translate TraceSegment Object to JsonObject"); TraceSegment traceSegment = (TraceSegment) message; AbstractMember discoverMember = memberContext().memberFor(TraceSegmentRecordMember.class.getSimpleName()); discoverMember.receive(traceSegment); @@ -67,11 +73,13 @@ public class ApplicationMember extends AbstractMember { } private void sendToNodeInstancePersistence(TraceSegment traceSegment) throws Throwable { - String code = traceSegment.getPrimaryRef().getApplicationCode(); - String address = traceSegment.getPrimaryRef().getPeerHost(); + if (traceSegment.getPrimaryRef() != null) { + String code = traceSegment.getPrimaryRef().getApplicationCode(); + String address = traceSegment.getPrimaryRef().getPeerHost(); - NodeInstancePersistence.Metric property = new NodeInstancePersistence.Metric(code, address); - tell(new NodeInstancePersistence.Factory(), RollingSelector.INSTANCE, property); + NodeInstancePersistence.Metric property = new NodeInstancePersistence.Metric(code, address); + tell(new NodeInstancePersistence.Factory(), RollingSelector.INSTANCE, property); + } } private void sendToResponseCostPersistence(TraceSegment traceSegment) throws Throwable { diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java index 703946cb0..17f0367a8 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java @@ -21,12 +21,12 @@ import java.util.Map; */ public class TraceSegmentRecordMember extends AbstractMember { - public TraceSegmentRecordMember(MemberSystem memberSystem, ActorRef actorRef) { + public TraceSegmentRecordMember(MemberSystem memberSystem, ActorRef actorRef) throws Throwable { super(memberSystem, actorRef); } @Override - public void preStart() throws Throwable { + public void preStart() throws Exception { } @Override @@ -53,11 +53,15 @@ public class TraceSegmentRecordMember extends AbstractMember { traceJsonObj.addProperty("endTime", traceSegment.getEndTime()); traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode()); - JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef()); - traceJsonObj.add("primaryRef", primaryRefJsonObj); + if (traceSegment.getPrimaryRef() != null) { + JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef()); + traceJsonObj.add("primaryRef", primaryRefJsonObj); + } - JsonArray refsJsonArray = parseRefs(traceSegment.getRefs()); - traceJsonObj.add("refs", refsJsonArray); +// if (traceSegment.getRefs() != null) { +// JsonArray refsJsonArray = parseRefs(traceSegment.getRefs()); +// traceJsonObj.add("refs", refsJsonArray); +// } JsonArray spanJsonArray = new JsonArray(); for (Span span : traceSegment.getSpans()) { diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/ApplicationRefMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/ApplicationRefMember.java index d6802a1fa..5221738fd 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/ApplicationRefMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/ApplicationRefMember.java @@ -13,12 +13,12 @@ import com.a.eye.skywalking.trace.TraceSegment; */ public class ApplicationRefMember extends AbstractMember { - public ApplicationRefMember(MemberSystem memberSystem, ActorRef actorRef) { + public ApplicationRefMember(MemberSystem memberSystem, ActorRef actorRef) throws Throwable { super(memberSystem, actorRef); } @Override - public void preStart() throws Throwable { + public void preStart() { } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/TraceSegmentRelationActor.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/TraceSegmentRelationActor.java deleted file mode 100644 index 8308b67ab..000000000 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/metric/TraceSegmentRelationActor.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.a.eye.skywalking.collector.worker.metric; - -import akka.actor.UntypedActor; - -/** - * @author pengys5 - */ -public class TraceSegmentRelationActor extends UntypedActor{ - @Override - public void onReceive(Object message) throws Throwable { - - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java index a66bf5515..c4e2d4207 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java @@ -7,22 +7,28 @@ import com.a.eye.skywalking.collector.worker.WorkerConfig; import com.a.eye.skywalking.collector.worker.application.ApplicationMember; import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMember; import com.a.eye.skywalking.trace.TraceSegment; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * @author pengys5 */ public class TraceSegmentReceiver extends AbstractWorker { + private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class); + @Override public void preStart() throws Exception { ApplicationMember.Factory factory = new ApplicationMember.Factory(); factory.createWorker(memberContext(), getSelf()); + super.preStart(); } @Override public void receive(Object message) throws Throwable { if (message instanceof TraceSegment) { TraceSegment traceSegment = (TraceSegment) message; + logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId()); AbstractMember applicationMember = memberContext().memberFor(ApplicationMember.class.getSimpleName()); applicationMember.receive(traceSegment); @@ -32,7 +38,7 @@ public class TraceSegmentReceiver extends AbstractWorker { } } - public class Factory extends AbstractWorkerProvider { + public static class Factory extends AbstractWorkerProvider { @Override public Class workerClass() { return TraceSegmentReceiver.class; diff --git a/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider b/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider index 8d06ff8e3..9e3945821 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider +++ b/skywalking-collector/skywalking-collector-worker/src/main/resources/META-INF/services/com.a.eye.skywalking.collector.actor.AbstractWorkerProvider @@ -1 +1,8 @@ -com.a.eye.skywalking.collector.actor.SpiTestWorkerFactory \ No newline at end of file +com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver$Factory +com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence$Factory +com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence$Factory +com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence$Factory +com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence$Factory +com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence$Factory +com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence$Factory +com.a.eye.skywalking.collector.worker.applicationref.presistence.DAGNodeRefPersistence$Factory \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config b/skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config index 06d518f56..76989da14 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config +++ b/skywalking-collector/skywalking-collector-worker/src/main/resources/collector.config @@ -1,5 +1,5 @@ -cluster.current.hostname = 192.168.0.1 +cluster.current.hostname = 127.0.0.1 cluster.current.port = 1000 -cluster.current.roles = [Test, Test1] +cluster.current.roles = [TraceSegmentReceiver] -cluster.nodes = [192.168.0.1:1000, 192.168.0.2:1000] \ No newline at end of file +cluster.nodes = ["akka.tcp://CollectorSystem@127.0.0.1:1000"] \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/resources/log4j2.xml b/skywalking-collector/skywalking-collector-worker/src/main/resources/log4j2.xml new file mode 100644 index 000000000..fd313a5d2 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java new file mode 100644 index 000000000..50af36fa9 --- /dev/null +++ b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/StartUpTestCase.java @@ -0,0 +1,54 @@ +package com.a.eye.skywalking.collector.worker; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import com.a.eye.skywalking.collector.actor.WorkerRef; +import com.a.eye.skywalking.collector.actor.WorkersCreator; +import com.a.eye.skywalking.collector.actor.selector.RollingSelector; +import com.a.eye.skywalking.collector.cluster.ClusterConfig; +import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer; +import com.a.eye.skywalking.collector.cluster.WorkersRefCenter; +import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver; +import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory; +import com.a.eye.skywalking.trace.TraceSegment; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.Test; +import org.powermock.api.support.membermodification.MemberModifier; + +import java.util.List; + +/** + * @author pengys5 + */ +public class StartUpTestCase { + + @Test + public void test() throws Exception { + ClusterConfigInitializer.initialize("collector.config"); + System.out.println(ClusterConfig.Cluster.Current.roles); + + final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname). + withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)). + withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)). + withFallback(ConfigFactory.parseString("akka.actor.provider=" + ClusterConfig.Cluster.provider)). + withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)). + withFallback(ConfigFactory.load()); + ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config); + WorkersCreator.INSTANCE.boot(system); + + Thread.sleep(2000); + + for (int i = 0; i < 1; i++) { + TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace(); + + List availableWorks = WorkersRefCenter.INSTANCE.availableWorks(TraceSegmentReceiver.class.getSimpleName()); + WorkerRef workerRef = RollingSelector.INSTANCE.select(availableWorks, traceSegment); + + ActorRef actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRef); + actorRef.tell(traceSegment, ActorRef.noSender()); + } + + Thread.sleep(10000); + } +} -- GitLab