提交 ebb0f9e4 编写于 作者: wu-sheng's avatar wu-sheng

Merge branch 'feature/3.0'

* feature/3.0:
  compile error
  refactor the way how to create worker instance
  1. Remove the primary ref. Only use ref to the parent segment. 2. Adjust globalTracerIds
  refactor cluster module, modify the way use to tell cluster worker
  Add tags test case.
  Add new test case for 3 InvokeContextTest.
  Fix interceptor instance singleton bug, and add some test cases.
  Alter TraceSegement, Add a list of DistributedTraceId as field `relatedGlobalTraces`.
  Shade akka.* in collector-worker. Prevent akka se/deserializing failure.
  Rename ServiceStarter to ServiceManager. Finish the major codes of TraceSegmentProcessQueue and CollectorClientService.
  Sync a result pom
  add recevier moudle
  fix test case issue
  fix tomcat cannot works issue
  fix #103
  Add a new ability to ServiceStarter, about finding a started service instance.
  Fix a test case failure issue.
  Recover logging-api/logging-log4j2 modules. Adjust codes for new modules.
......@@ -53,7 +53,30 @@
<scala.compiler.version>2.11.7</scala.compiler.version>
<powermock.version>1.6.4</powermock.version>
<docker.plugin.version>0.4.13</docker.plugin.version>
<skywalking.version>2.1-2017</skywalking.version>
<shade.package>com.a.eye.skywalking.dependencies</shade.package>
<shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source>
<shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target>
<shade.com.lmax.disruptor.source>com.lmax.disruptor</shade.com.lmax.disruptor.source>
<shade.com.lmax.disruptor.target>${shade.package}.${shade.com.lmax.disruptor.source}</shade.com.lmax.disruptor.target>
<shade.akka.source>akka</shade.akka.source>
<shade.akka.target>${shade.package}.${shade.akka.source}</shade.akka.target>
<shade.scala.source>scala</shade.scala.source>
<shade.scala.target>${shade.package}.${shade.scala.source}</shade.scala.target>
<shade.org.agrona.source>org.agrona</shade.org.agrona.source>
<shade.org.agrona.target>${shade.package}.${shade.org.agrona.source}</shade.org.agrona.target>
<shade.org.jboss.netty.source>org.jboss.netty</shade.org.jboss.netty.source>
<shade.org.jboss.netty.target>${shade.package}.${shade.org.jboss.netty.source}</shade.org.jboss.netty.target>
<shade.org.reactivestreams.source>org.reactivestreams</shade.org.reactivestreams.source>
<shade.org.reactivestreams.target>${shade.package}.${shade.org.reactivestreams.source}</shade.org.reactivestreams.target>
<shade.org.uncommons.maths.source>org.uncommons.maths</shade.org.uncommons.maths.source>
<shade.org.uncommons.maths.target>${shade.package}.${shade.org.uncommons.maths.source}</shade.org.uncommons.maths.target>
<shade.com.google.source>com.google</shade.com.google.source>
<shade.com.google.target>${shade.package}.${shade.com.google.source}</shade.com.google.target>
<shade.io.aeron.source>io.aeron</shade.io.aeron.source>
<shade.io.aeron.target>${shade.package}.${shade.io.aeron.source}</shade.io.aeron.target>
<shade.com.typesafe.source>com.typesafe</shade.com.typesafe.source>
<shade.com.typesafe.target>${shade.package}.${shade.com.typesafe.source}</shade.com.typesafe.target>
</properties>
<dependencies>
......
......@@ -5,6 +5,7 @@
<modules>
<module>skywalking-collector-cluster</module>
<module>skywalking-collector-worker</module>
<module>skywalking-collector-role</module>
</modules>
<parent>
<artifactId>skywalking</artifactId>
......@@ -25,34 +26,9 @@
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-metrics_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
......@@ -60,11 +36,5 @@
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -16,11 +16,6 @@
</properties>
<dependencies>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
......@@ -28,7 +23,7 @@
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-api</artifactId>
<artifactId>skywalking-util</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
package com.a.eye.skywalking.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.UsedRoleNameException;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public class CollectorSystem {
private ILog logger = LogManager.getLogger(CollectorSystem.class);
private ClusterWorkerContext clusterContext;
public ClusterWorkerContext getClusterContext() {
return clusterContext;
}
public void boot() throws Exception {
createAkkaSystem();
createListener();
loadLocalProviders();
createClusterWorker();
}
public void terminate() {
clusterContext.getAkkaSystem().terminate();
}
private void createAkkaSystem() {
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.actor.provider=" + ClusterConfig.Cluster.provider)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)).
withFallback(ConfigFactory.load("application.conf"));
ActorSystem akkaSystem = ActorSystem.create("ClusterSystem", config);
clusterContext = new ClusterWorkerContext(akkaSystem);
}
private void createListener() {
clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WorkName);
}
private void createClusterWorker() throws Exception {
ServiceLoader<AbstractClusterWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractClusterWorkerProvider.class);
for (AbstractClusterWorkerProvider provider : clusterServiceLoader) {
logger.info("create {%s} worker using java service loader", provider.workerNum());
for (int i = 1; i <= provider.workerNum(); i++) {
provider.create(clusterContext, new LocalWorkerContext());
}
}
}
private void loadLocalProviders() throws UsedRoleNameException {
ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
clusterContext.putProvider(provider);
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
/**
* @author pengys5
*/
public abstract class AbstractAsyncMember extends AbstractMember {
private RingBuffer<MessageHolder> ringBuffer;
public AbstractAsyncMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(actorRef);
this.ringBuffer = ringBuffer;
}
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
Object message = event.getMessage();
event.reset();
receive(message);
if (endOfBatch) {
receive(new EndOfBatchCommand());
}
}
public void beTold(Object message) throws Exception {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
} finally {
ringBuffer.publish(sequence);
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
/**
* @author pengys5
*/
public abstract class AbstractClusterWorker extends AbstractWorker {
public AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
static class WorkerWithAkka extends UntypedActor {
private static ILog logger = LogManager.getLogger(WorkerWithAkka.class);
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
public WorkerWithAkka(AbstractClusterWorker ownerWorker) {
this.ownerWorker = ownerWorker;
cluster = Cluster.get(getContext().system());
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
}
@Override
public void postStop() throws Exception {
cluster.unsubscribe(getSelf());
}
/**
* Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
* cluster event, when event send from the member of {@link WorkersListener} then tell
* the sender to register self.
*/
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
ownerWorker.work(message);
}
}
/**
* When member role is {@link WorkersListener#WorkName} then Select actor from context
* and send register message to {@link WorkersListener}
*
* @param member is the new created or restart worker
*/
void register(Member member) {
if (member.hasRole(WorkersListener.WorkName)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(ownerWorker.getRole());
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
}
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import akka.actor.Props;
/**
* @author pengys5
*/
public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWorker> extends AbstractWorkerProvider<T> {
public abstract int workerNum();
@Override
final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());
T clusterWorker = (T) workerInstance(clusterContext);
clusterWorker.preStart();
ActorRef actorRef = clusterContext.getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role() + "_" + num);
ClusterWorkerRef workerRef = new ClusterWorkerRef(actorRef, role());
clusterContext.put(workerRef);
return workerRef;
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
/**
* @author pengys5
*/
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
static class WorkerWithDisruptor implements EventHandler<MessageHolder> {
private RingBuffer<MessageHolder> ringBuffer;
private AbstractLocalAsyncWorker asyncWorker;
public WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) {
this.ringBuffer = ringBuffer;
this.asyncWorker = asyncWorker;
}
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
try {
Object message = event.getMessage();
event.reset();
asyncWorker.work(message);
if (endOfBatch) {
asyncWorker.work(new EndOfBatchCommand());
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void tell(Object message) throws Exception {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
} finally {
ringBuffer.publish(sequence);
}
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.DaemonThreadFactory;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.queue.MessageHolderFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.lang.reflect.Constructor;
/**
* @author pengys5
*/
public abstract class AbstractAsyncMemberProvider<T extends EventHandler> extends AbstractMemberProvider<T> {
public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAsyncWorker> extends AbstractLocalWorkerProvider<T> {
public abstract int queueSize();
@Override
public T createWorker(ActorRef actorRef) throws Exception {
if (memberClass() == null) {
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
}
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class<?>[]{RingBuffer.class, ActorRef.class});
memberConstructor.setAccessible(true);
final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
T localAsyncWorker = (T) workerInstance(clusterContext);
localAsyncWorker.preStart();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = queueSize();
if (!((((bufferSize - 1) & bufferSize) == 0) && bufferSize != 0)) {
throw new IllegalArgumentException("queue size must be power of 2");
}
// Construct the Disruptor
Disruptor<MessageHolder> disruptor = new Disruptor<MessageHolder>(MessageHolderFactory.INSTANCE, bufferSize, DaemonThreadFactory.INSTANCE);
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
T member = (T) memberConstructor.newInstance(ringBuffer, actorRef);
T.WorkerWithDisruptor disruptorWorker = new T.WorkerWithDisruptor(ringBuffer, localAsyncWorker);
// Connect the handler
disruptor.handleEventsWith(member);
disruptor.handleEventsWith(disruptorWorker);
// Start the Disruptor, starts all threads running
disruptor.start();
return member;
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), disruptorWorker);
localContext.put(workerRef);
return workerRef;
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public abstract class AbstractLocalSyncWorkerProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalWorkerProvider<T> {
@Override
final public WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
T localSyncWorker = (T) workerInstance(clusterContext);
localSyncWorker.preStart();
LocalSyncWorkerRef workerRef = new LocalSyncWorkerRef(role(), localSyncWorker);
localContext.put(workerRef);
return workerRef;
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public abstract class AbstractLocalWorker extends AbstractWorker {
public AbstractLocalWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
}
......@@ -3,8 +3,5 @@ package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public class CollectorBootstrap {
public static void main(String[] args) {
// ActorSystem system = ActorSystem.create("ClusterSystem", config);
}
public abstract class AbstractLocalWorkerProvider<T extends AbstractLocalWorker> extends AbstractWorkerProvider<T> {
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.EventHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
public abstract class AbstractMember implements EventHandler<MessageHolder> {
private Logger logger = LogManager.getFormatterLogger(AbstractMember.class);
private ActorRef actorRef;
private ActorRef getSelf() {
return actorRef;
}
public AbstractMember(ActorRef actorRef) {
this.actorRef = actorRef;
}
protected abstract void beTold(Object message) throws Exception;
/**
* Receive the message to analyse.
*
* @param message is the data send from the forward worker
* @throws Exception is the exception thrown by that worker implementation processing
*/
public abstract void receive(Object message) throws Exception;
/**
* Send analysed data to next Worker.
*
* @param targetWorkerProvider is the worker provider to create worker instance.
* @param selector is the selector to select a same role worker instance form cluster.
* @param message is the data used to send to next worker.
* @throws Exception
*/
public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Exception {
logger.debug("worker provider: %s ,role name: %s", targetWorkerProvider.getClass().getName(), targetWorkerProvider.roleName());
List<WorkerRef> availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName());
selector.select(availableWorks, message).tell(message, getSelf());
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
/**
* @author pengys5
*/
public abstract class AbstractMemberProvider<T> {
public abstract Class memberClass();
public abstract T createWorker(ActorRef actorRef) throws Exception;
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
/**
* @author pengys5
*/
public abstract class AbstractSyncMember extends AbstractMember {
public AbstractSyncMember(ActorRef actorRef) {
super(actorRef);
}
@Override
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
}
@Override
public void beTold(Object message) throws Exception {
receive(message);
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import java.lang.reflect.Constructor;
/**
* @author pengys5
*/
public abstract class AbstractSyncMemberProvider<T> extends AbstractMemberProvider<T> {
@Override
public T createWorker(ActorRef actorRef) throws Exception {
if (memberClass() == null) {
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
}
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class<?>[]{ActorRef.class});
memberConstructor.setAccessible(true);
T member = (T) memberConstructor.newInstance(actorRef);
return member;
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* Abstract implementation of the {@link akka.actor.UntypedActor} that represents an
* analysis unit. <code>AbstractWorker</code> implementation process the message in
* {@link #receive(Object)} method.
* <p>
* <p>
* Subclasses must implement the abstract {@link #receive(Object)} method to process message.
* Subclasses forbid to override the {@link #onReceive(Object)} method.
* <p>
* Here is an example on how to create and use an {@link AbstractWorker}:
* <p>
* {{{
* public class SampleWorker extends AbstractWorker {
*
* @author pengys5
* @Override public void receive(Object message) throws Throwable {
* if (message.equals("Tell Next")) {
* Object sendMessage = new Object();
* tell(new NextSampleWorkerFactory(), RollingSelector.INSTANCE, sendMessage);
* }
* }
* }
* }}}
*/
public abstract class AbstractWorker extends UntypedActor {
public abstract class AbstractWorker {
private final LocalWorkerContext selfContext;
private Logger logger = LogManager.getFormatterLogger(AbstractWorker.class);
private final Role role;
private Cluster cluster = Cluster.get(getContext().system());
private final ClusterWorkerContext clusterContext;
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
public AbstractWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
this.role = role;
this.clusterContext = clusterContext;
this.selfContext = selfContext;
}
@Override
public void postStop() throws Exception {
cluster.unsubscribe(getSelf());
}
public abstract void preStart() throws ProviderNotFountException;
/**
* Receive the message to analyse.
*
* @param message is the data send from the forward worker
* @throws Throwable is the exception thrown by that worker implementation processing
*/
public abstract void receive(Object message) throws Throwable;
public abstract void work(Object message) throws Exception;
/**
* Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
* cluster event, when event send from the member of {@link WorkersListener} then tell
* the sender to register self.
*/
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
receive(message);
}
}
/**
* Send analysed data to next Worker.
*
* @param targetWorkerProvider is the worker provider to create worker instance.
* @param selector is the selector to select a same role worker instance form cluster.
* @param message is the data used to send to next worker.
* @throws Throwable
*/
public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Throwable {
List<WorkerRef> availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName());
selector.select(availableWorks, message).tell(message, getSelf());
final public LocalWorkerContext getSelfContext() {
return selfContext;
}
public void tell(AbstractMember targetMember, Object message) throws Exception {
targetMember.beTold(message);
final public ClusterWorkerContext getClusterContext() {
return clusterContext;
}
/**
* When member role is {@link WorkersListener#WorkName} then Select actor from context
* and send register message to {@link WorkersListener}
*
* @param member is the new created or restart worker
*/
void register(Member member) {
if (member.hasRole(WorkersListener.WorkName)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName());
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
}
final public Role getRole() {
return role;
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The <code>AbstractWorkerProvider</code> should be implemented by any class whose
* instances are intended to provide create instance of the {@link AbstractWorker}.
* The {@link WorkersCreator} use java service loader to load provider implementer,
* so you should config the service file.
* <p>
* Here is an example on how to create and use an {@link AbstractWorkerProvider}:
* <p>
* {{{
* public class SampleWorkerFactory extends AbstractWorkerProvider {
*
* @author pengys5
* @Override public Class workerClass() {
* return SampleWorker.class;
* }
* @Override public int workerNum() {
* return Config.SampleWorkerNum;
* }
* }
* }}}
* <p>
*/
public abstract class AbstractWorkerProvider {
private Logger logger = LogManager.getFormatterLogger(AbstractWorkerProvider.class);
public abstract class AbstractWorkerProvider<T extends AbstractWorker> implements Provider {
public abstract Class workerClass();
public abstract Role role();
public abstract int workerNum();
public abstract T workerInstance(ClusterWorkerContext clusterContext);
public void createWorker(ActorSystem system) {
if (workerClass() == null) {
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from workerClass()");
}
if (workerNum() <= 0) {
throw new IllegalArgumentException("cannot createInstance() with obtained from workerNum() must greater than 0");
}
public abstract WorkerRef onCreate(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException;
for (int i = 1; i <= workerNum(); i++) {
system.actorOf(Props.create(workerClass()), roleName() + "_" + i);
logger.info("create akka actor, actor id is %s", roleName() + "_" + i);
final public WorkerRef create(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFountException {
if (workerInstance(clusterContext) == null) {
throw new IllegalArgumentException("cannot get worker instance with nothing obtained from workerInstance()");
}
}
/**
* Use {@link #workerClass()} method returned class's simple name as a role name.
*
* @return is role of Worker
*/
protected String roleName() {
return workerClass().getSimpleName();
return onCreate(clusterContext, localContext);
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class ClusterWorkerContext extends WorkerContext {
private ILog logger = LogManager.getLogger(ClusterWorkerContext.class);
private final ActorSystem akkaSystem;
private Map<String, AbstractWorkerProvider> providers = new ConcurrentHashMap<>();
public ClusterWorkerContext(ActorSystem akkaSystem) {
this.akkaSystem = akkaSystem;
}
public ActorSystem getAkkaSystem() {
return akkaSystem;
}
@Override
public AbstractWorkerProvider findProvider(Role role) throws ProviderNotFountException {
logger.debug("find role of %s provider from ClusterWorkerContext", role.roleName());
if (providers.containsKey(role.roleName())) {
return providers.get(role.roleName());
} else {
throw new ProviderNotFountException("role=" + role.roleName() + ", no available provider.");
}
}
@Override
public void putProvider(AbstractWorkerProvider provider) throws UsedRoleNameException {
logger.debug("put role of %s provider into ClusterWorkerContext", provider.role().roleName());
if (providers.containsKey(provider.role().roleName())) {
throw new UsedRoleNameException("provider with role=" + provider.role().roleName() + " duplicate each other.");
} else {
providers.put(provider.role().roleName(), provider);
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
/**
* @author pengys5
*/
public class ClusterWorkerRef extends WorkerRef {
private ActorRef actorRef;
public ClusterWorkerRef(ActorRef actorRef, Role role) {
super(role);
this.actorRef = actorRef;
}
@Override
public void tell(Object message) {
actorRef.tell(message, ActorRef.noSender());
}
}
package com.a.eye.skywalking.collector.actor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author pengys5
*/
public enum ClusterWorkerRefCounter {
INSTANCE;
private Map<String, AtomicInteger> counter = new ConcurrentHashMap<>();
public int incrementAndGet(Role role) {
if (!counter.containsKey(role.roleName())) {
AtomicInteger atomic = new AtomicInteger(0);
counter.putIfAbsent(role.roleName(), atomic);
}
return counter.get(role.roleName()).incrementAndGet();
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public interface Context {
AbstractWorkerProvider findProvider(Role role) throws ProviderNotFountException;
void putProvider(AbstractWorkerProvider provider) throws UsedRoleNameException;
WorkerRefs lookup(Role role) throws WorkerNotFountException;
void put(WorkerRef workerRef);
void remove(WorkerRef workerRef);
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public class LocalAsyncWorkerRef extends WorkerRef {
private AbstractLocalAsyncWorker.WorkerWithDisruptor workerWithDisruptor;
public LocalAsyncWorkerRef(Role role, AbstractLocalAsyncWorker.WorkerWithDisruptor workerWithDisruptor) {
super(role);
this.workerWithDisruptor = workerWithDisruptor;
}
@Override
public void tell(Object message) throws Exception {
workerWithDisruptor.tell(message);
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public class LocalSyncWorkerRef extends WorkerRef {
private AbstractLocalSyncWorker localSyncWorker;
public LocalSyncWorkerRef(Role role, AbstractLocalSyncWorker localSyncWorker) {
super(role);
this.localSyncWorker = localSyncWorker;
}
@Override
public void tell(Object message) throws Exception {
localSyncWorker.work(message);
}
}
......@@ -3,15 +3,15 @@ package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public class SpiTestWorkerFactory extends AbstractWorkerProvider {
public class LocalWorkerContext extends WorkerContext {
@Override
public Class workerClass() {
return SpiTestWorker.class;
final public AbstractWorkerProvider findProvider(Role role) throws ProviderNotFountException {
return null;
}
@Override
public int workerNum() {
return 2;
final public void putProvider(AbstractWorkerProvider provider) throws UsedRoleNameException {
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public interface Provider {
WorkerRef create(ClusterWorkerContext clusterContext, LocalWorkerContext localContext) throws Exception;
}
package com.a.eye.skywalking.collector.actor;
public class ProviderNotFountException extends Exception {
public ProviderNotFountException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public interface Role {
String roleName();
WorkerSelector workerSelector();
}
package com.a.eye.skywalking.collector.actor;
public class UsedRoleNameException extends Exception {
public UsedRoleNameException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.actor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public abstract class WorkerContext implements Context {
private Map<String, List<WorkerRef>> roleWorkers = new ConcurrentHashMap<>();
@Override
final public WorkerRefs lookup(Role role) throws WorkerNotFountException {
if (roleWorkers.containsKey(role.roleName())) {
WorkerRefs refs = new WorkerRefs(roleWorkers.get(role.roleName()), role.workerSelector());
return refs;
} else {
throw new WorkerNotFountException("role=" + role.roleName() + ", no available worker.");
}
}
@Override
final public void put(WorkerRef workerRef) {
if (!roleWorkers.containsKey(workerRef.getRole().roleName())) {
roleWorkers.putIfAbsent(workerRef.getRole().roleName(), new ArrayList<WorkerRef>());
}
roleWorkers.get(workerRef.getRole().roleName()).add(workerRef);
}
@Override
final public void remove(WorkerRef workerRef) {
roleWorkers.remove(workerRef.getRole().roleName());
}
}
package com.a.eye.skywalking.collector.actor;
public class WorkerNotFountException extends Exception {
public WorkerNotFountException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The Worker reference
*
* @author pengys5
*/
public class WorkerRef {
private Logger logger = LogManager.getFormatterLogger(WorkerRef.class);
final ActorRef actorRef;
final String workerRole;
public abstract class WorkerRef {
private Role role;
public WorkerRef(ActorRef actorRef, String workerRole) {
this.actorRef = actorRef;
this.workerRole = workerRole;
public WorkerRef(Role role) {
this.role = role;
}
void tell(Object message, ActorRef sender) {
logger.debug("tell %s worker", actorRef.toString());
actorRef.tell(message, sender);
final public Role getRole() {
return role;
}
public ActorPath path() {
return actorRef.path();
}
public String getWorkerRole() {
return workerRole;
}
public abstract void tell(Object message) throws Exception;
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import java.util.List;
/**
* @author pengys5
*/
public class WorkerRefs<T extends WorkerRef> {
private static ILog logger = LogManager.getLogger(WorkerRefs.class);
private List<T> workerRefs;
private WorkerSelector workerSelector;
protected WorkerRefs(List<T> workerRefs, WorkerSelector workerSelector) {
this.workerRefs = workerRefs;
this.workerSelector = workerSelector;
}
public void tell(Object message) throws Exception {
logger.debug("WorkerSelector instance of %s", workerSelector.getClass());
workerSelector.select(workerRefs, message).tell(message);
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ServiceLoader;
/**
* <code>WorkersCreator</code> is a util that use Java Spi to create
* workers by META-INF config file.
*
* @author pengys5
*/
public enum WorkersCreator {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(WorkersCreator.class);
/**
* create worker to use Java Spi.
*
* @param system is create by akka {@link ActorSystem}
*/
public void boot(ActorSystem system) {
system.actorOf(Props.create(WorkersListener.class), WorkersListener.WorkName);
ServiceLoader<AbstractWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractWorkerProvider.class);
for (AbstractWorkerProvider provider : clusterServiceLoader) {
logger.info("create worker {%s} using java service loader", provider.workerClass().getName());
provider.createWorker(system);
}
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} by message hashcode, so it can use to send the same hashcode
* message to same {@link WorkerRef}. Usually, use to database operate which avoid dirty data.
*
* @author pengys5
*/
public enum HashCodeSelector implements WorkerSelector {
INSTANCE;
public class HashCodeSelector implements WorkerSelector<WorkerRef> {
/**
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
if (message instanceof AbstractHashMessage) {
AbstractHashMessage hashMessage = (AbstractHashMessage) message;
int size = members.size();
int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return members.get(selectIndex);
} else {
throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage");
}
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} nearly random, by round-robin.
*
* @author wusheng
* @author pengys5
*/
public enum RollingSelector implements WorkerSelector {
INSTANCE;
public class RollingSelector implements WorkerSelector<WorkerRef> {
/**
* A simple round variable.
*/
private int index = 0;
/**
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
int size = members.size();
......
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>WorkerSelector</code> should be implemented
* by any class whose instances are intended to provide select a {@link WorkerRef} from a {@link WorkerRef} list.
* <p></p>
* Actually, the <code>WorkerRef</code> is designed to provide a routing ability in the collector cluster.
*
* @author wusheng
* @author pengys5
*/
public interface WorkerSelector {
/**
* select a {@link WorkerRef} from a {@link WorkerRef} list.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
WorkerRef select(List<WorkerRef> members, Object message);
public interface WorkerSelector<T extends WorkerRef> {
T select(List<T> members, Object message);
}
......@@ -2,9 +2,8 @@ package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.api.util.ConfigInitializer;
import com.a.eye.skywalking.api.util.StringUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import java.io.InputStream;
import java.util.Properties;
......@@ -21,7 +20,7 @@ import java.util.Properties;
*/
public class ClusterConfigInitializer {
private static Logger logger = LogManager.getFormatterLogger(ClusterConfigInitializer.class);
private static ILog logger = LogManager.getLogger(ClusterConfigInitializer.class);
public static final String ConfigFileName = "collector.config";
......
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.collector.actor.Role;
import java.io.Serializable;
/**
......@@ -12,14 +14,14 @@ import java.io.Serializable;
public class WorkerListenerMessage {
public static class RegisterMessage implements Serializable {
public final String workRole;
private final Role role;
public RegisterMessage(String workRole) {
this.workRole = workRole;
public RegisterMessage(Role role) {
this.role = role;
}
public String getWorkRole() {
return workRole;
public Role getRole() {
return role;
}
}
}
......@@ -5,8 +5,14 @@ import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.ClusterWorkerRef;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* <code>WorkersListener</code> listening the register message from workers
......@@ -14,20 +20,28 @@ import org.apache.logging.log4j.Logger;
* and terminated message from akka cluster.
* <p>
* when listened register message then begin to watch the state for this worker
* and register to {@link WorkersRefCenter}.
* and register to {@link ClusterWorkerContext} and {@link #relation}.
* <p>
* when listened terminate message then unregister from {@link WorkersRefCenter}.
* when listened terminate message then unregister from {@link ClusterWorkerContext} and {@link #relation} .
*
* @author pengys5
*/
public class WorkersListener extends UntypedActor {
private Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private ILog logger = LogManager.getLogger(WorkersListener.class);
public static final String WorkName = "WorkersListener";
private Cluster cluster = Cluster.get(getContext().system());
private Map<ActorRef, ClusterWorkerRef> relation = new ConcurrentHashMap<>();
private final ClusterWorkerContext clusterContext;
public WorkersListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
......@@ -38,14 +52,26 @@ public class WorkersListener extends UntypedActor {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
ActorRef sender = getSender();
logger.info("register worker of role: %s, path: %s", register.getWorkRole(), sender.toString());
WorkersRefCenter.INSTANCE.register(sender, register.getWorkRole());
// logger.info("register worker of role: %s, path: %s", register.getWorkRole(), sender.toString());
ClusterWorkerRef workerRef = new ClusterWorkerRef(sender, register.getRole());
relation.put(sender, workerRef);
clusterContext.put(new ClusterWorkerRef(sender, register.getRole()));
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
WorkersRefCenter.INSTANCE.unregister(terminated.getActor());
clusterContext.remove(relation.get(terminated.getActor()));
relation.remove(terminated.getActor());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember) message;
WorkersRefCenter.INSTANCE.unregister(unreachableMember.member().address());
Iterator<Map.Entry<ActorRef, ClusterWorkerRef>> iterator = relation.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ActorRef, ClusterWorkerRef> next = iterator.next();
if (next.getKey().path().address().equals(unreachableMember.member().address())) {
clusterContext.remove(next.getValue());
iterator.remove();
}
}
} else {
unhandled(message);
}
......
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Address;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* <code>WorkersRefCenter</code> represent a cache center,
* store all {@link ActorRef}s, each of them represent a Akka Actor instance.
* All the Actors in this JVM, can find alive-actor in here, and send message.
*
* @author wusheng
*/
public enum WorkersRefCenter {
INSTANCE;
private Map<String, List<WorkerRef>> roleToWorkerRef = new ConcurrentHashMap();
private Map<ActorRef, WorkerRef> actorRefToWorkerRef = new ConcurrentHashMap<>();
public void register(ActorRef newActorRef, String workerRole) {
if (!roleToWorkerRef.containsKey(workerRole)) {
List<WorkerRef> actorList = Collections.synchronizedList(new ArrayList<WorkerRef>());
roleToWorkerRef.putIfAbsent(workerRole, actorList);
}
WorkerRef newWorkerRef = new WorkerRef(newActorRef, workerRole);
roleToWorkerRef.get(workerRole).add(newWorkerRef);
actorRefToWorkerRef.put(newActorRef, newWorkerRef);
}
public void unregister(ActorRef oldActorRef) {
WorkerRef oldWorkerRef = actorRefToWorkerRef.get(oldActorRef);
roleToWorkerRef.get(oldWorkerRef.getWorkerRole()).remove(oldWorkerRef);
actorRefToWorkerRef.remove(oldActorRef);
}
public void unregister(Address address) {
Iterator<ActorRef> actorRefToWorkerRefIterator = actorRefToWorkerRef.keySet().iterator();
while (actorRefToWorkerRefIterator.hasNext()) {
if (address.equals(actorRefToWorkerRefIterator.next().path().address())) {
actorRefToWorkerRefIterator.remove();
}
}
Iterator<Map.Entry<String, List<WorkerRef>>> roleToWorkerRefIterator = roleToWorkerRef.entrySet().iterator();
while (roleToWorkerRefIterator.hasNext()) {
List<WorkerRef> workerRefList = roleToWorkerRefIterator.next().getValue();
Iterator<WorkerRef> workerRefIterator = workerRefList.iterator();
while (workerRefIterator.hasNext()) {
if (workerRefIterator.next().path().address().equals(address)) {
workerRefIterator.remove();
}
}
}
}
/**
* Get all available {@link WorkerRef} list, by the given worker role.
*
* @param workerRole the given role
* @return available {@link WorkerRef} list
* @throws NoAvailableWorkerException , when no available worker.
*/
public List<WorkerRef> availableWorks(String workerRole) throws NoAvailableWorkerException {
List<WorkerRef> refs = roleToWorkerRef.get(workerRole);
if (refs == null || refs.size() == 0) {
throw new NoAvailableWorkerException("role=" + workerRole + ", no available worker.");
}
return Collections.unmodifiableList(refs);
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class AbstractWorkerProviderTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
}
@Test(expected = IllegalArgumentException.class)
public void testCreateWorkerWhenWorkerClassIsNull() {
AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() {
@Override
public Class workerClass() {
return Object.class;
}
@Override
public int workerNum() {
return 1;
}
};
aWorkerProvider.createWorker(system);
}
@Test(expected = IllegalArgumentException.class)
public void testCreateWorkerWhenWorkerNumLessThan_1() {
AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() {
@Override
public Class workerClass() {
return null;
}
@Override
public int workerNum() {
return 0;
}
};
aWorkerProvider.createWorker(system);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
/**
* @author pengys5
*/
public class SpiTestWorker extends AbstractWorker {
@Override
public void receive(Object message) throws Throwable {
if (message.equals("Test1")) {
getSender().tell("Yes", getSelf());
} else if (message.equals("Test2")) {
getSender().tell("No", getSelf());
} else if (message.equals("Test3")) {
Object sendMessage = new Object();
tell(new SpiTestWorkerFactory(), RollingSelector.INSTANCE, sendMessage);
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class SpiTestWorkerFactoryTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
}
@Test
public void testCreateWorker() {
new JavaTestKit(system) {{
SpiTestWorkerFactory aWorkerProvider = new SpiTestWorkerFactory();
aWorkerProvider.createWorker(system);
system.actorSelection("/user/" + aWorkerProvider.roleName() + "_1").tell("Test1", getRef());
expectMsgEquals(duration("1 second"), "Yes");
system.actorSelection("/user/" + aWorkerProvider.roleName() + "_2").tell("Test2", getRef());
expectMsgEquals(duration("1 second"), "No");
}};
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestClusterWorker extends AbstractClusterWorker {
public TestClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(TestLocalSyncWorker.TestLocalSyncWorkerRole.INSTANCE).create(getClusterContext(), getSelfContext());
getClusterContext().findProvider(TestLocalAsyncWorker.TestLocalASyncWorkerRole.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void work(Object message) throws Exception {
if (message.equals("Print")) {
System.out.println(message);
} else if (message.equals("TellLocalWorker")) {
System.out.println(message);
getSelfContext().lookup(TestLocalSyncWorker.TestLocalSyncWorkerRole.INSTANCE).tell(message);
} else if (message.equals("TellLocalAsyncWorker")) {
System.out.println(message);
getSelfContext().lookup(TestLocalAsyncWorker.TestLocalASyncWorkerRole.INSTANCE).tell(message);
} else {
System.out.println("unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<TestClusterWorker> {
@Override
public int workerNum() {
return 5;
}
@Override
public Role role() {
return TestClusterWorkerRole.INSTANCE;
}
@Override
public TestClusterWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestClusterWorker(role(), clusterContext, new LocalWorkerContext());
}
}
public enum TestClusterWorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestClusterWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.CollectorSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class TestClusterWorkerTestCase {
private CollectorSystem collectorSystem;
public void createSystem() throws Exception {
collectorSystem = new CollectorSystem();
collectorSystem.boot();
}
public void terminateSystem() {
collectorSystem.terminate();
}
public void testTellWorker() throws Exception {
WorkerRefs workerRefs = collectorSystem.getClusterContext().lookup(TestClusterWorker.TestClusterWorkerRole.INSTANCE);
workerRefs.tell("Print");
workerRefs.tell("TellLocalWorker");
workerRefs.tell("TellLocalAsyncWorker");
Thread.sleep(5000);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestLocalAsyncWorker extends AbstractLocalAsyncWorker {
public TestLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFountException {
}
@Override
public void work(Object message) throws Exception {
if (message.equals("TellLocalAsyncWorker")) {
System.out.println("hello async!");
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<TestLocalAsyncWorker> {
@Override
public int queueSize() {
return 1024;
}
@Override
public Role role() {
return TestLocalASyncWorkerRole.INSTANCE;
}
@Override
public TestLocalAsyncWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestLocalAsyncWorker(role(), clusterContext, new LocalWorkerContext());
}
}
public enum TestLocalASyncWorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestLocalAsyncWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestLocalSyncWorker extends AbstractLocalSyncWorker {
public TestLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFountException {
}
@Override
public void work(Object message) throws Exception {
if (message.equals("TellLocalWorker")) {
System.out.println("hello! ");
} else {
System.out.println("unhandled");
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<TestLocalSyncWorker> {
@Override
public Role role() {
return TestLocalSyncWorkerRole.INSTANCE;
}
@Override
public TestLocalSyncWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestLocalSyncWorker(role(), clusterContext, new LocalWorkerContext());
}
}
public enum TestLocalSyncWorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestLocalSyncWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.CollectorSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class TestLocalSyncWorkerTestCase {
@Before
public void createSystem() throws Exception {
}
@After
public void terminateSystem() {
}
@Test
public void testTellWorker() throws Exception {
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class WorkersCreatorTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
}
@Test
public void testBoot() {
new JavaTestKit(system) {{
WorkersCreator.INSTANCE.boot(system);
system.actorSelection("/user/SpiTestWorker_1").tell("Test1", getRef());
expectMsgEquals(duration("1 second"), "Yes");
system.actorSelection("/user/SpiTestWorker_2").tell("Test2", getRef());
expectMsgEquals(duration("1 second"), "No");
}};
}
}
package com.a.eye.skywalking.collector.cluster;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class CollectorConfigTestCase {
@Before
public void resetArguments() {
System.clearProperty("cluster.current.hostname");
System.clearProperty("cluster.current.port");
System.clearProperty("cluster.current.roles");
System.clearProperty("cluster.nodes");
}
@Test
public void testInitializeUseConfigFile() {
ClusterConfigInitializer.initialize("collector.config");
Assert.assertEquals("192.168.0.1", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("[Test, Test1]", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("[192.168.0.1:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes);
}
@Test
public void testInitializeUseArguments() {
System.setProperty("cluster.current.hostname", "192.168.0.2");
System.setProperty("cluster.current.port", "1001");
System.setProperty("cluster.current.roles", "Test3, Test4");
System.setProperty("cluster.nodes", "[192.168.0.2:1000, 192.168.0.2:1000]");
ClusterConfigInitializer.initialize("collector.config");
Assert.assertEquals("192.168.0.2", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("Test3, Test4", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("[192.168.0.2:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes);
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import scala.concurrent.Future;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class WorkerListenerTestCase {
ActorSystem system;
TestActorRef<WorkersListener> senderActorRef;
TestActorRef<WorkersListener> receiveactorRef;
@Before
public void initData() {
system = ActorSystem.create();
final Props props = Props.create(WorkersListener.class);
senderActorRef = TestActorRef.create(system, props, "WorkersListenerSender");
receiveactorRef = TestActorRef.create(system, props, "WorkersListenerReceive");
WorkerListenerMessage.RegisterMessage message = new WorkerListenerMessage.RegisterMessage("WorkersListener");
receiveactorRef.tell(message, senderActorRef);
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
}
@Test
public void testRegister() throws IllegalAccessException {
Map<ActorRef, WorkerRef> actorRefToWorkerRef = (Map<ActorRef, WorkerRef>) MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").get(WorkersRefCenter.INSTANCE);
ActorRef senderRefInWorkerRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(actorRefToWorkerRef.get(senderActorRef));
Assert.assertEquals(senderActorRef, senderRefInWorkerRef);
Map<String, List<WorkerRef>> roleToWorkerRef = (Map<String, List<WorkerRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").get(WorkersRefCenter.INSTANCE);
WorkerRef workerRef = roleToWorkerRef.get("WorkersListener").get(0);
senderRefInWorkerRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRef);
Assert.assertEquals(senderActorRef, senderRefInWorkerRef);
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class WorkersRefCenterTestCase {
ActorSystem system;
TestActorRef<WorkersListener> actorRef1;
TestActorRef<WorkersListener> actorRef2;
TestActorRef<WorkersListener> actorRef3;
@Before
public void createSystem() {
system = ActorSystem.create();
final Props props = Props.create(WorkersListener.class);
actorRef1 = TestActorRef.create(system, props, "WorkersListener1");
actorRef2 = TestActorRef.create(system, props, "WorkersListener2");
actorRef3 = TestActorRef.create(system, props, "WorkersListener3");
WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener");
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
}
@Test
public void testRegister() throws IllegalAccessException {
Map<String, List<WorkerRef>> roleToActor = (Map<String, List<WorkerRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").get(WorkersRefCenter.INSTANCE);
List<WorkerRef> workerRefs = roleToActor.get("WorkersListener");
ActorRef actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRefs.get(0));
Assert.assertEquals(actorRef1, actorRef);
actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRefs.get(1));
Assert.assertEquals(actorRef2, actorRef);
actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRefs.get(2));
Assert.assertEquals(actorRef3, actorRef);
Map<ActorRef, WorkerRef> actorToRole = (Map<ActorRef, WorkerRef>) MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").get(WorkersRefCenter.INSTANCE);
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef1).getWorkerRole());
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef2).getWorkerRole());
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef3).getWorkerRole());
}
@Test
public void testUnRegister() throws IllegalAccessException {
WorkersRefCenter.INSTANCE.unregister(actorRef1);
Map<String, List<WorkerRef>> roleToWorkerRef = (Map<String, List<WorkerRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").get(WorkersRefCenter.INSTANCE);
ActorRef actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(roleToWorkerRef.get("WorkersListener").get(0));
Assert.assertEquals(actorRef2, actorRef);
actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(roleToWorkerRef.get("WorkersListener").get(1));
Assert.assertEquals(actorRef3, actorRef);
Map<ActorRef, WorkerRef> actorRefToWorkerRef = (Map<ActorRef, WorkerRef>) MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").get(WorkersRefCenter.INSTANCE);
Assert.assertEquals(null, actorRefToWorkerRef.get(actorRef1));
}
@Test
public void testSizeOf() throws NoAvailableWorkerException {
Assert.assertEquals(3, WorkersRefCenter.INSTANCE.availableWorks("WorkersListener").size());
}
}
com.a.eye.skywalking.collector.actor.TestLocalSyncWorker$Factory
com.a.eye.skywalking.collector.actor.TestLocalAsyncWorker$Factory
\ No newline at end of file
......@@ -5,15 +5,33 @@ akka {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
// data = "com.a.eye.skywalking.collector.worker.TraceSegmentSerializer"
// json = "com.a.eye.skywalking.collector.worker.JsonSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
// "java.io.Serializable" = none
// "com.a.eye.skywalking.messages.ISerializable" = data
// "com.google.gson.JsonObject" = json
// "java.io.Serializable" = none
}
// serialize-messages = on
// serialize-messages = on
warn-about-java-serializer-usage = on
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 1000
}
}
cluster {
auto-down-unreachable-after = off
metrics.enabled = off
}
}
\ No newline at end of file
cluster.current.hostname = 192.168.0.1
cluster.current.hostname = 127.0.0.1
cluster.current.port = 1000
cluster.current.roles = [Test, Test1]
cluster.nodes = [192.168.0.1:1000, 192.168.0.2:1000]
\ No newline at end of file
cluster.nodes=["akka.tcp://CollectorSystem@127.0.0.1:1000", "akka.tcp://CollectorSystem@127.0.0.1:1001", "akka.tcp://CollectorSystem@127.0.0.1:1002"]
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n" />
</Console>
</Appenders>
<Loggers>
<Root level="info">
<Root level="debug">
<AppenderRef ref="Console" />
</Root>
</Loggers>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>skywalking-collector</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-collector-role</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.a.eye.skywalking.collector.role;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public enum TraceSegmentReceiverRole implements Role {
INSTANCE;
@Override
public String roleName() {
return "TraceSegmentReceiver";
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
......@@ -18,6 +18,26 @@
<artifactId>skywalking-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-trace</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-log4j2</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-collector-role</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.nanohttpd</groupId>
<artifactId>nanohttpd</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
......@@ -35,4 +55,35 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<relocations>
<relocation>
<pattern>${shade.akka.source}</pattern>
<shadedPattern>${shade.akka.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMember;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class AnalysisMember extends AbstractAsyncMember {
public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(AnalysisMember.class);
public AnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public AnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public abstract void analyse(Object message) throws Exception;
@Override
public void receive(Object message) throws Exception {
public void preStart() throws ProviderNotFountException {
}
@Override
public void work(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.actor.WorkersCreator;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.NoAvailableWorkerException;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.logging.log4j2.Log4j2Resolver;
import com.a.eye.skywalking.collector.worker.httpserver.HttpServer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.net.UnknownHostException;
/**
* @author pengys5
*/
public class CollectorBootStartUp {
public static void main(String[] args) throws NoAvailableWorkerException, InterruptedException, UnknownHostException {
/**
* TODO pengys5, make the exception clear.
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
LogManager.setLogResolver(new Log4j2Resolver());
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
......@@ -26,8 +30,9 @@ public class CollectorBootStartUp {
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)).
withFallback(ConfigFactory.load("application.conf"));
ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
WorkersCreator.INSTANCE.boot(system);
EsClient.boot();
// ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
// WorkersCreator.INSTANCE.boot(system);
HttpServer.INSTANCE.boot();
// EsClient.boot();
}
}
package com.a.eye.skywalking.collector.worker;
import akka.serialization.JSerializer;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* @author pengys5
......@@ -23,14 +20,12 @@ public class JsonSerializer extends JSerializer {
@Override
public byte[] toBinary(Object o) {
// System.out.println("Json toBinary");
JsonObject jsonObject = (JsonObject) o;
return jsonObject.toString().getBytes();
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
// System.out.println("Json fromBinaryJava");
Gson gson = new Gson();
return gson.fromJson(new String(bytes), JsonObject.class);
}
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
......@@ -17,8 +20,8 @@ public abstract class MetricAnalysisMember extends AnalysisMember {
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricAnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public MetricAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public void setMetric(String id, int second, Long value) throws Exception {
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
......@@ -28,8 +28,8 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricPersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public MetricPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMember;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class PersistenceMember extends AbstractAsyncMember {
public abstract class PersistenceMember extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(PersistenceMember.class);
public PersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public PersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public abstract String esIndex();
......@@ -26,7 +23,12 @@ public abstract class PersistenceMember extends AbstractAsyncMember {
public abstract void analyse(Object message) throws Exception;
@Override
public void receive(Object message) throws Exception {
public void preStart() throws ProviderNotFountException {
}
@Override
public void work(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
persistence();
} else {
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
......@@ -18,8 +21,8 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
private RecordPersistenceData persistenceData = new RecordPersistenceData();
public RecordAnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public RecordAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public void setRecord(String id, JsonObject record) throws Exception {
......
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
......@@ -24,8 +24,8 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
protected RecordPersistenceData persistenceData = new RecordPersistenceData();
public RecordPersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public RecordPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......
package com.a.eye.skywalking.collector.worker.application;
import akka.actor.ActorRef;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.AbstractSyncMember;
import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.application.analysis.DAGNodeAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.NodeInstanceAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseCostAnalysis;
......@@ -11,40 +11,40 @@ import com.a.eye.skywalking.collector.worker.application.analysis.ResponseSummar
import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.a.eye.skywalking.trace.tag.Tags;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ApplicationMain extends AbstractSyncMember {
public class ApplicationMain extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(ApplicationMain.class);
private DAGNodeAnalysis dagNodeAnalysis;
private NodeInstanceAnalysis nodeInstanceAnalysis;
private ResponseCostAnalysis responseCostAnalysis;
private ResponseSummaryAnalysis responseSummaryAnalysis;
private TraceSegmentRecordPersistence recordPersistence;
public ApplicationMain(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public ApplicationMain(ActorRef actorRef) throws Exception {
super(actorRef);
dagNodeAnalysis = DAGNodeAnalysis.Factory.INSTANCE.createWorker(actorRef);
nodeInstanceAnalysis = NodeInstanceAnalysis.Factory.INSTANCE.createWorker(actorRef);
responseCostAnalysis = ResponseCostAnalysis.Factory.INSTANCE.createWorker(actorRef);
responseSummaryAnalysis = ResponseSummaryAnalysis.Factory.INSTANCE.createWorker(actorRef);
recordPersistence = TraceSegmentRecordPersistence.Factory.INSTANCE.createWorker(actorRef);
@Override
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(DAGNodeAnalysis.Role.INSTANCE).create(getClusterContext(), getSelfContext());
getClusterContext().findProvider(NodeInstanceAnalysis.Role.INSTANCE).create(getClusterContext(), getSelfContext());
getClusterContext().findProvider(ResponseCostAnalysis.Role.INSTANCE).create(getClusterContext(), getSelfContext());
getClusterContext().findProvider(ResponseSummaryAnalysis.Role.INSTANCE).create(getClusterContext(), getSelfContext());
getClusterContext().findProvider(TraceSegmentRecordPersistence.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void receive(Object message) throws Exception {
public void work(Object message) throws Exception {
if (message instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
logger.debug("begin translate TraceSegment Object to JsonObject");
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
recordPersistence.beTold(traceSegment);
getSelfContext().lookup(TraceSegmentRecordPersistence.Role.INSTANCE).tell(traceSegment);
sendToDAGNodePersistence(traceSegment);
sendToNodeInstanceAnalysis(traceSegment);
......@@ -53,12 +53,31 @@ public class ApplicationMain extends AbstractSyncMember {
}
}
public static class Factory extends AbstractSyncMemberProvider<ApplicationMain> {
public static class Factory extends AbstractLocalSyncWorkerProvider<ApplicationMain> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ApplicationMain.class;
public Role role() {
return null;
}
@Override
public ApplicationMain workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationMain(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ApplicationMain.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
......@@ -75,18 +94,21 @@ public class ApplicationMain extends AbstractSyncMember {
}
DAGNodeAnalysis.Metric node = new DAGNodeAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, component, layer);
dagNodeAnalysis.beTold(node);
getSelfContext().lookup(DAGNodeAnalysis.Role.INSTANCE).tell(node);
}
private void sendToNodeInstanceAnalysis(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
TraceSegmentRef traceSegmentRef = traceSegment.getTraceSegment().getPrimaryRef();
TraceSegment segment = traceSegment.getTraceSegment();
List<TraceSegmentRef> refs = segment.getRefs();
if (traceSegmentRef != null && !StringUtil.isEmpty(traceSegmentRef.getApplicationCode())) {
String code = traceSegmentRef.getApplicationCode();
String address = traceSegmentRef.getPeerHost();
if (refs != null) {
for (TraceSegmentRef ref : refs) {
String code = segment.getApplicationCode();
String address = ref.getPeerHost();
NodeInstanceAnalysis.Metric property = new NodeInstanceAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, address);
nodeInstanceAnalysis.beTold(property);
getSelfContext().lookup(NodeInstanceAnalysis.Role.INSTANCE).tell(property);
}
}
}
......@@ -106,7 +128,7 @@ public class ApplicationMain extends AbstractSyncMember {
}
ResponseCostAnalysis.Metric cost = new ResponseCostAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError, startTime, endTime);
responseCostAnalysis.beTold(cost);
getSelfContext().lookup(ResponseCostAnalysis.Role.INSTANCE).tell(cost);
}
private void sendToResponseSummaryPersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
......@@ -120,6 +142,6 @@ public class ApplicationMain extends AbstractSyncMember {
}
ResponseSummaryAnalysis.Metric summary = new ResponseSummaryAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError);
responseSummaryAnalysis.beTold(summary);
getSelfContext().lookup(ResponseSummaryAnalysis.Role.INSTANCE).tell(summary);
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -21,8 +21,8 @@ public class DAGNodeAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeAnalysis.class);
public DAGNodeAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public DAGNodeAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -47,16 +47,21 @@ public class DAGNodeAnalysis extends RecordAnalysisMember {
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
tell(DAGNodeReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
getClusterContext().lookup(DAGNodeReceiver.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeAnalysis> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodeAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeAnalysis.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public DAGNodeAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -65,6 +70,20 @@ public class DAGNodeAnalysis extends RecordAnalysisMember {
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return DAGNodeAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final String component;
......
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.NodeInstanceReceiver;
......@@ -11,7 +12,6 @@ import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -22,8 +22,8 @@ public class NodeInstanceAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceAnalysis.class);
public NodeInstanceAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public NodeInstanceAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -47,16 +47,21 @@ public class NodeInstanceAnalysis extends RecordAnalysisMember {
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
tell(NodeInstanceReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
getClusterContext().lookup(NodeInstanceReceiver.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<NodeInstanceAnalysis> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeInstanceAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return NodeInstanceAnalysis.class;
public Role role() {
return null;
}
@Override
public NodeInstanceAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -65,7 +70,21 @@ public class NodeInstanceAnalysis extends RecordAnalysisMember {
}
}
public static class Metric extends AbstractTimeSlice{
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstanceAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final String address;
......
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -20,8 +20,8 @@ public class ResponseCostAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostAnalysis.class);
public ResponseCostAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public ResponseCostAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -41,16 +41,21 @@ public class ResponseCostAnalysis extends MetricAnalysisMember {
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
tell(ResponseCostReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneMetric);
getClusterContext().lookup(ResponseCostReceiver.Role.INSTANCE).tell(oneMetric);
}
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseCostAnalysis> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseCostAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseCostAnalysis.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public ResponseCostAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseCostAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -59,6 +64,20 @@ public class ResponseCostAnalysis extends MetricAnalysisMember {
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseCostAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
......
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -20,8 +20,8 @@ public class ResponseSummaryAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryAnalysis.class);
public ResponseSummaryAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public ResponseSummaryAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -38,16 +38,21 @@ public class ResponseSummaryAnalysis extends MetricAnalysisMember {
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
tell(ResponseSummaryReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneMetric);
getClusterContext().lookup(ResponseSummaryReceiver.Role.INSTANCE).tell(oneMetric);
}
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseSummaryAnalysis> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseSummaryAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseSummaryAnalysis.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public ResponseSummaryAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseSummaryAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -56,6 +61,20 @@ public class ResponseSummaryAnalysis extends MetricAnalysisMember {
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseSummaryAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
......
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -17,8 +17,8 @@ public class DAGNodePersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodePersistence.class);
public DAGNodePersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public DAGNodePersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -31,12 +31,17 @@ public class DAGNodePersistence extends RecordPersistenceMember {
return "dag_node";
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodePersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodePersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodePersistence.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public DAGNodePersistence workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodePersistence(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -44,4 +49,18 @@ public class DAGNodePersistence extends RecordPersistenceMember {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return DAGNodePersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -16,8 +17,8 @@ public class NodeInstancePersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(NodeInstancePersistence.class);
public NodeInstancePersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public NodeInstancePersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -30,12 +31,17 @@ public class NodeInstancePersistence extends RecordPersistenceMember {
return "node_instance";
}
public static class Factory extends AbstractAsyncMemberProvider<NodeInstancePersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeInstancePersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return NodeInstancePersistence.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstancePersistence workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstancePersistence(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -43,4 +49,18 @@ public class NodeInstancePersistence extends RecordPersistenceMember {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstancePersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -16,8 +17,8 @@ public class ResponseCostPersistence extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostPersistence.class);
public ResponseCostPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public ResponseCostPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -30,12 +31,17 @@ public class ResponseCostPersistence extends MetricPersistenceMember {
return "response_cost";
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseCostPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseCostPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseCostPersistence.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public ResponseCostPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseCostPersistence(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -43,4 +49,18 @@ public class ResponseCostPersistence extends MetricPersistenceMember {
return WorkerConfig.Queue.Persistence.ResponseCostPersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseCostPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -16,8 +17,8 @@ public class ResponseSummaryPersistence extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryPersistence.class);
public ResponseSummaryPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public ResponseSummaryPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -30,12 +31,17 @@ public class ResponseSummaryPersistence extends MetricPersistenceMember {
return "response_summary";
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseSummaryPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseSummaryPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseSummaryPersistence.class;
public Role role() {
return null;
}
@Override
public ResponseSummaryPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseSummaryPersistence(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -43,4 +49,18 @@ public class ResponseSummaryPersistence extends MetricPersistenceMember {
return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseSummaryPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
......@@ -14,7 +16,6 @@ import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -38,8 +39,8 @@ public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
return "trace_segment";
}
public TraceSegmentRecordPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public TraceSegmentRecordPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -54,17 +55,36 @@ public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
}
}
public static class Factory extends AbstractAsyncMemberProvider<TraceSegmentRecordPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<TraceSegmentRecordPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
}
@Override
public Class memberClass() {
return TraceSegmentRecordPersistence.class;
public TraceSegmentRecordPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new TraceSegmentRecordPersistence(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return TraceSegmentRecordPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
......@@ -76,11 +96,6 @@ public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
traceJsonObj.addProperty("endTime", traceSegment.getEndTime());
traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode());
if (traceSegment.getPrimaryRef() != null) {
JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef());
traceJsonObj.add("primaryRef", primaryRefJsonObj);
}
if (traceSegment.getRefs() != null) {
JsonArray refsJsonArray = parseRefs(traceSegment.getRefs());
traceJsonObj.add("refs", refsJsonArray);
......@@ -96,15 +111,6 @@ public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
return traceJsonObj;
}
private JsonObject parsePrimaryRef(TraceSegmentRef primaryRef) {
JsonObject primaryRefJsonObj = new JsonObject();
primaryRefJsonObj.addProperty("appCode", primaryRef.getApplicationCode());
primaryRefJsonObj.addProperty("spanId", primaryRef.getSpanId());
primaryRefJsonObj.addProperty("peerHost", primaryRef.getPeerHost());
primaryRefJsonObj.addProperty("segmentId", primaryRef.getTraceSegmentId());
return primaryRefJsonObj;
}
private JsonArray parseRefs(List<TraceSegmentRef> refs) {
JsonArray refsJsonArray = new JsonArray();
for (TraceSegmentRef ref : refs) {
......
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -11,33 +12,39 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeReceiver extends AbstractWorker {
public class DAGNodeReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeReceiver.class);
private DAGNodePersistence persistence;
public DAGNodeReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws Exception {
super.preStart();
persistence = DAGNodePersistence.Factory.INSTANCE.createWorker(getSelf());
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(DAGNodePersistence.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void receive(Object message) throws Throwable {
public void work(Object message) throws Exception {
if (message instanceof RecordData) {
persistence.beTold(message);
getSelfContext().lookup(DAGNodePersistence.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractClusterWorkerProvider<DAGNodeReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return DAGNodeReceiver.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public DAGNodeReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeReceiver(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -45,4 +52,18 @@ public class DAGNodeReceiver extends AbstractWorker {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return DAGNodeReceiver.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -11,33 +12,39 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstanceReceiver extends AbstractWorker {
public class NodeInstanceReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceReceiver.class);
private NodeInstancePersistence persistence;
public NodeInstanceReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws Exception {
super.preStart();
persistence = NodeInstancePersistence.Factory.INSTANCE.createWorker(getSelf());
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(NodeInstancePersistence.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void receive(Object message) throws Throwable {
public void work(Object message) throws Exception {
if (message instanceof RecordData) {
persistence.beTold(message);
getSelfContext().lookup(NodeInstancePersistence.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractClusterWorkerProvider<NodeInstanceReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return NodeInstanceReceiver.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeInstanceReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceReceiver(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -45,4 +52,18 @@ public class NodeInstanceReceiver extends AbstractWorker {
return WorkerConfig.Worker.NodeInstanceReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstanceReceiver.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
......@@ -11,33 +12,39 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostReceiver extends AbstractWorker {
public class ResponseCostReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseCostReceiver.class);
private ResponseCostPersistence persistence;
public ResponseCostReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws Exception {
super.preStart();
persistence = ResponseCostPersistence.Factory.INSTANCE.createWorker(getSelf());
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(ResponseCostPersistence.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void receive(Object message) throws Throwable {
public void work(Object message) throws Exception {
if (message instanceof MetricData) {
persistence.beTold(message);
getSelfContext().lookup(ResponseCostPersistence.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractClusterWorkerProvider<ResponseCostReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return ResponseCostReceiver.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public ResponseCostReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseCostReceiver(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -45,4 +52,18 @@ public class ResponseCostReceiver extends AbstractWorker {
return WorkerConfig.Worker.ResponseCostReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseCostReceiver.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
......@@ -11,33 +12,39 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseSummaryReceiver extends AbstractWorker {
public class ResponseSummaryReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryReceiver.class);
private ResponseSummaryPersistence persistence;
public ResponseSummaryReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws Exception {
super.preStart();
persistence = ResponseSummaryPersistence.Factory.INSTANCE.createWorker(getSelf());
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(ResponseSummaryPersistence.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void receive(Object message) throws Throwable {
public void work(Object message) throws Exception {
if (message instanceof MetricData) {
persistence.beTold(message);
getSelfContext().lookup(ResponseSummaryPersistence.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractClusterWorkerProvider<ResponseSummaryReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return ResponseSummaryReceiver.class;
public Role role() {
return null;
}
@Override
public ResponseSummaryReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseSummaryReceiver(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -45,4 +52,18 @@ public class ResponseSummaryReceiver extends AbstractWorker {
return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseSummaryReceiver.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
import akka.actor.ActorRef;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.AbstractSyncMember;
import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.applicationref.analysis.DAGNodeRefAnalysis;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import java.util.List;
/**
* @author pengys5
*/
public class ApplicationRefMain extends AbstractSyncMember {
public class ApplicationRefMain extends AbstractLocalSyncWorker {
private DAGNodeRefAnalysis dagNodeRefAnalysis;
public ApplicationRefMain(ActorRef actorRef) throws Throwable {
super(actorRef);
dagNodeRefAnalysis = DAGNodeRefAnalysis.Factory.INSTANCE.createWorker(actorRef);
public ApplicationRefMain(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void receive(Object message) throws Exception {
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(DAGNodeRefAnalysis.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void work(Object message) throws Exception {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
TraceSegmentRef traceSegmentRef = traceSegment.getTraceSegment().getPrimaryRef();
if (traceSegmentRef != null && !StringUtil.isEmpty(traceSegmentRef.getApplicationCode())) {
String front = traceSegmentRef.getApplicationCode();
String behind = traceSegment.getTraceSegment().getApplicationCode();
TraceSegment segment = traceSegment.getTraceSegment();
List<TraceSegmentRef> refs = segment.getRefs();
if(refs != null){
for (TraceSegmentRef ref : refs) {
String front = ref.getApplicationCode();
String behind = segment.getApplicationCode();
DAGNodeRefAnalysis.Metric nodeRef = new DAGNodeRefAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), front, behind);
dagNodeRefAnalysis.beTold(nodeRef);
getSelfContext().lookup(DAGNodeRefAnalysis.Role.INSTANCE).tell(nodeRef);
}
}
}
public static class Factory extends AbstractSyncMemberProvider<ApplicationRefMain> {
public static class Factory extends AbstractLocalSyncWorkerProvider<ApplicationRefMain> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ApplicationRefMain.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public ApplicationRefMain workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRefMain(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ApplicationRefMain.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.receiver.DAGNodeRefReceiver;
......@@ -11,7 +12,6 @@ import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -22,8 +22,8 @@ public class DAGNodeRefAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefAnalysis.class);
public DAGNodeRefAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public DAGNodeRefAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -45,17 +45,22 @@ public class DAGNodeRefAnalysis extends RecordAnalysisMember {
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
tell(DAGNodeRefReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
getClusterContext().lookup(DAGNodeRefReceiver.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeRefAnalysis> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodeRefAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeRefAnalysis.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public DAGNodeRefAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeRefAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -64,6 +69,20 @@ public class DAGNodeRefAnalysis extends RecordAnalysisMember {
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return DAGNodeRefAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String frontCode;
private final String behindCode;
......
package com.a.eye.skywalking.collector.worker.applicationref.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -16,8 +17,8 @@ public class DAGNodeRefPersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefPersistence.class);
public DAGNodeRefPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
public DAGNodeRefPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
......@@ -30,13 +31,18 @@ public class DAGNodeRefPersistence extends RecordPersistenceMember {
return "node_ref";
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeRefPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodeRefPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeRefPersistence.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public DAGNodeRefPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeRefPersistence(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -44,4 +50,18 @@ public class DAGNodeRefPersistence extends RecordPersistenceMember {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return DAGNodeRefPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.persistence.DAGNodeRefPersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
......@@ -11,33 +12,41 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefReceiver extends AbstractWorker {
public class DAGNodeRefReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefReceiver.class);
private DAGNodeRefPersistence persistence;
public DAGNodeRefReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws Exception {
super.preStart();
persistence = DAGNodeRefPersistence.Factory.INSTANCE.createWorker(getSelf());
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(DAGNodeRefPersistence.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void receive(Object message) throws Throwable {
public void work(Object message) throws Exception {
if (message instanceof RecordData) {
persistence.beTold(message);
getSelfContext().lookup(DAGNodeRefPersistence.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractClusterWorkerProvider<DAGNodeRefReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return DAGNodeRefReceiver.class;
public Role role() {
return Role.INSTANCE;
}
@Override
public DAGNodeRefReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeRefReceiver(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -45,4 +54,18 @@ public class DAGNodeRefReceiver extends AbstractWorker {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return DAGNodeRefReceiver.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.Role;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class Controller {
protected abstract NanoHTTPD.Method httpMethod();
protected abstract String path();
protected abstract JsonElement execute(Map<String, String> parms);
protected void tell(Role role, Object message) throws Exception {
// targetMember.beTold(message);
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.httpserver;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public enum ControllerCenter {
INSTANCE;
private Map<String, Controller> getControllers = new ConcurrentHashMap();
private Map<String, Controller> postControllers = new ConcurrentHashMap();
protected void register(NanoHTTPD.Method method, String path, Controller controller) throws DuplicateControllerException {
if (NanoHTTPD.Method.GET.equals(method)) {
if (getControllers.containsKey(path)) {
throw new DuplicateControllerException("method: " + method + "with path: " + path + " duplicate each other");
} else {
getControllers.put(path, controller);
}
} else if (NanoHTTPD.Method.POST.equals(method)) {
if (postControllers.containsKey(path)) {
throw new DuplicateControllerException("method: " + method + "with path: " + path + " duplicate each other");
} else {
postControllers.put(path, controller);
}
}
}
protected Controller find(NanoHTTPD.Method method, String path) {
if (NanoHTTPD.Method.GET.equals(method)) {
return getControllers.get(path);
} else if (NanoHTTPD.Method.POST.equals(method)) {
return postControllers.get(path);
} else {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ControllerCreator {
INSTANCE;
public void boot() throws Exception {
ServiceLoader<ControllerProvider> controllerLoader = java.util.ServiceLoader.load(ControllerProvider.class);
for (ControllerProvider provider : controllerLoader) {
Controller controller = provider.create();
ControllerCenter.INSTANCE.register(controller.httpMethod(), controller.path(), controller);
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
public class ControllerNotFoundException extends Exception {
public ControllerNotFoundException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
/**
* @author pengys5
*/
public abstract class ControllerProvider {
public abstract Class clazz();
public Controller create() throws Exception {
Controller controller = (Controller) clazz().newInstance();
return controller;
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
public class DuplicateControllerException extends Exception {
public DuplicateControllerException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Map;
/**
* @author pengys5
*/
public enum HttpServer {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(HttpServer.class);
public void boot() throws Exception {
NanoHttpServer server = new NanoHttpServer(7001);
ControllerCreator.INSTANCE.boot();
}
public class NanoHttpServer extends NanoHTTPD {
public NanoHttpServer(int port) throws IOException {
super(port);
start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
logger.info("Running! Point your browsers to http://localhost:%d/", port);
}
@Override
public Response serve(IHTTPSession session) {
Method method = session.getMethod();
String uri = session.getUri();
Map<String, String> parms = session.getParms();
logger.debug("request method: %s, uri: %s, parms: %s", method.toString(), uri, parms);
try {
JsonElement response = RequestDispatcher.INSTANCE.dispatch(method, uri, parms);
return newFixedLengthResponse(Response.Status.OK, "text/json", response.toString());
} catch (ControllerNotFoundException e) {
String errorMessage = e.getMessage();
return newFixedLengthResponse(Response.Status.NOT_FOUND, "text/html", errorMessage);
}
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public enum RequestDispatcher {
INSTANCE;
public JsonElement dispatch(NanoHTTPD.Method method, String uri, Map<String, String> parms) throws ControllerNotFoundException {
Controller controller = ControllerCenter.INSTANCE.find(method, uri);
if (controller != null) {
return controller.execute(parms);
} else {
throw new ControllerNotFoundException("Could not found controller for [method: " + method.name() + ", uri: " + uri + "]");
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver.controller;
import com.a.eye.skywalking.collector.worker.httpserver.Controller;
import com.a.eye.skywalking.collector.worker.httpserver.ControllerProvider;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public class DagController extends Controller {
@Override
public NanoHTTPD.Method httpMethod() {
return NanoHTTPD.Method.GET;
}
@Override
public String path() {
return "/getNodes";
}
@Override
public JsonElement execute(Map<String, String> parms) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("test", "aaaa");
return jsonObject;
}
public static class Factory extends ControllerProvider {
@Override
public Class clazz() {
return DagController.class;
}
}
}
package com.a.eye.skywalking.collector.worker.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.role.TraceSegmentReceiverRole;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.ApplicationMain;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMain;
......@@ -14,22 +14,22 @@ import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class TraceSegmentReceiver extends AbstractWorker {
public class TraceSegmentReceiver extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class);
private ApplicationMain applicationMain;
private ApplicationRefMain applicationRefMain;
public TraceSegmentReceiver(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public TraceSegmentReceiver() throws Exception {
applicationMain = ApplicationMain.Factory.INSTANCE.createWorker(getSelf());
applicationRefMain = ApplicationRefMain.Factory.INSTANCE.createWorker(getSelf());
@Override
public void preStart() throws ProviderNotFountException {
getClusterContext().findProvider(ApplicationMain.Role.INSTANCE).create(getClusterContext(), getSelfContext());
getClusterContext().findProvider(ApplicationRefMain.Role.INSTANCE).create(getClusterContext(), getSelfContext());
}
@Override
public void receive(Object message) throws Throwable {
public void work(Object message) throws Exception {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId());
......@@ -37,22 +37,27 @@ public class TraceSegmentReceiver extends AbstractWorker {
int second = DateTools.timeStampToSecond(traceSegment.getStartTime());
TraceSegmentTimeSlice segmentTimeSlice = new TraceSegmentTimeSlice(timeSlice, second, traceSegment);
tell(applicationMain, segmentTimeSlice);
tell(applicationRefMain, segmentTimeSlice);
getSelfContext().lookup(ApplicationMain.Role.INSTANCE).tell(segmentTimeSlice);
getSelfContext().lookup(ApplicationRefMain.Role.INSTANCE).tell(segmentTimeSlice);
}
}
public static class Factory extends AbstractWorkerProvider {
public static class Factory extends AbstractClusterWorkerProvider<TraceSegmentReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return TraceSegmentReceiver.class;
public int workerNum() {
return WorkerConfig.Worker.TraceSegmentReceiver.Num;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.TraceSegmentReceiver.Num;
public Role role() {
return TraceSegmentReceiverRole.INSTANCE;
}
@Override
public TraceSegmentReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new TraceSegmentReceiver(role(), clusterContext, new LocalWorkerContext());
}
}
......
com.a.eye.skywalking.collector.worker.httpserver.controller.DagController$Factory
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册