提交 2fb18ebc 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #97 from wu-sheng/feature/collector

Add collector modules to 3.0
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.a.eye</groupId>
<artifactId>skywalking</artifactId>
<version>3.0-2017</version>
<groupId>com.a.eye</groupId>
<artifactId>skywalking</artifactId>
<version>3.0-2017</version>
<licenses>
<license>
<name>GNU GENERAL PUBLIC LICENSE V3</name>
<url>https://github.com/wu-sheng/sky-walking/blob/master/LICENSE</url>
</license>
</licenses>
<licenses>
<license>
<name>GNU GENERAL PUBLIC LICENSE V3</name>
<url>https://github.com/wu-sheng/sky-walking/blob/master/LICENSE</url>
</license>
</licenses>
<developers>
<developer>
<name>Wu Sheng</name>
<email>wu.sheng@foxmail.com</email>
<url>https://wu-sheng.github.io/me/</url>
</developer>
<developer>
<name>Zhang Xin</name>
<url>https://github.com/ascrutae</url>
</developer>
</developers>
<developers>
<developer>
<name>Wu Sheng</name>
<email>wu.sheng@foxmail.com</email>
<url>https://wu-sheng.github.io/me/</url>
</developer>
<developer>
<name>Zhang Xin</name>
<url>https://github.com/ascrutae</url>
</developer>
</developers>
<modules>
<module>skywalking-commons</module>
<module>skywalking-sniffer</module>
<module>skywalking-application-toolkit</module>
</modules>
<packaging>pom</packaging>
<modules>
<module>skywalking-commons</module>
<module>skywalking-sniffer</module>
<module>skywalking-application-toolkit</module>
<module>skywalking-collector</module>
</modules>
<packaging>pom</packaging>
<name>skywalking</name>
<url>https://github.com/wu-sheng/sky-walking</url>
<name>skywalking</name>
<url>https://github.com/wu-sheng/sky-walking</url>
<issueManagement>
<system>GitHub</system>
<url>https://github.com/wu-sheng/sky-walking/issues</url>
</issueManagement>
<issueManagement>
<system>GitHub</system>
<url>https://github.com/wu-sheng/sky-walking/issues</url>
</issueManagement>
<ciManagement>
<system>travis</system>
<url>https://travis-ci.org/wu-sheng/sky-walking</url>
</ciManagement>
<ciManagement>
<system>travis</system>
<url>https://travis-ci.org/wu-sheng/sky-walking</url>
</ciManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
<powermock.version>1.6.4</powermock.version>
<docker.plugin.version>0.4.13</docker.plugin.version>
<skywalking.version>2.1-2017</skywalking.version>
</properties>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
<scala.compiler.version>2.11.7</scala.compiler.version>
<powermock.version>1.6.4</powermock.version>
<docker.plugin.version>0.4.13</docker.plugin.version>
<skywalking.version>2.1-2017</skywalking.version>
</properties>
<dependencies>
<dependency>
......@@ -161,5 +162,4 @@
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>skywalking-collector-cluster</module>
<module>skywalking-collector-worker</module>
</modules>
<parent>
<artifactId>skywalking</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<artifactId>skywalking-collector</artifactId>
<packaging>pom</packaging>
<properties>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-metrics_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-tools_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.9</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>skywalking-collector</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-collector-cluster</artifactId>
<packaging>jar</packaging>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
/**
* @author pengys5
*/
public abstract class AbstractAsyncMember extends AbstractMember {
private RingBuffer<MessageHolder> ringBuffer;
public AbstractAsyncMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(actorRef);
this.ringBuffer = ringBuffer;
}
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
Object message = event.getMessage();
event.reset();
receive(message);
if (endOfBatch) {
receive(new EndOfBatchCommand());
}
}
public void beTold(Object message) throws Exception {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
} finally {
ringBuffer.publish(sequence);
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.DaemonThreadFactory;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.queue.MessageHolderFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.lang.reflect.Constructor;
/**
* @author pengys5
*/
public abstract class AbstractAsyncMemberProvider<T extends EventHandler> extends AbstractMemberProvider<T> {
public abstract int queueSize();
@Override
public T createWorker(ActorRef actorRef) throws Exception {
if (memberClass() == null) {
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
}
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class<?>[]{RingBuffer.class, ActorRef.class});
memberConstructor.setAccessible(true);
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = queueSize();
// Construct the Disruptor
Disruptor<MessageHolder> disruptor = new Disruptor<MessageHolder>(MessageHolderFactory.INSTANCE, bufferSize, DaemonThreadFactory.INSTANCE);
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
T member = (T) memberConstructor.newInstance(ringBuffer, actorRef);
// Connect the handler
disruptor.handleEventsWith(member);
// Start the Disruptor, starts all threads running
disruptor.start();
return member;
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.EventHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
public abstract class AbstractMember implements EventHandler<MessageHolder> {
private Logger logger = LogManager.getFormatterLogger(AbstractMember.class);
private ActorRef actorRef;
private ActorRef getSelf() {
return actorRef;
}
public AbstractMember(ActorRef actorRef) {
this.actorRef = actorRef;
}
protected abstract void beTold(Object message) throws Exception;
/**
* Receive the message to analyse.
*
* @param message is the data send from the forward worker
* @throws Exception is the exception thrown by that worker implementation processing
*/
public abstract void receive(Object message) throws Exception;
/**
* Send analysed data to next Worker.
*
* @param targetWorkerProvider is the worker provider to create worker instance.
* @param selector is the selector to select a same role worker instance form cluster.
* @param message is the data used to send to next worker.
* @throws Exception
*/
public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Exception {
logger.debug("worker provider: %s ,role name: %s", targetWorkerProvider.getClass().getName(), targetWorkerProvider.roleName());
List<WorkerRef> availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName());
selector.select(availableWorks, message).tell(message, getSelf());
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
/**
* @author pengys5
*/
public abstract class AbstractMemberProvider<T> {
public abstract Class memberClass();
public abstract T createWorker(ActorRef actorRef) throws Exception;
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
/**
* @author pengys5
*/
public abstract class AbstractSyncMember extends AbstractMember {
public AbstractSyncMember(ActorRef actorRef) {
super(actorRef);
}
@Override
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception {
}
@Override
public void beTold(Object message) throws Exception {
receive(message);
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorRef;
import java.lang.reflect.Constructor;
/**
* @author pengys5
*/
public abstract class AbstractSyncMemberProvider<T> extends AbstractMemberProvider<T> {
@Override
public T createWorker(ActorRef actorRef) throws Exception {
if (memberClass() == null) {
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()");
}
Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class<?>[]{ActorRef.class});
memberConstructor.setAccessible(true);
T member = (T) memberConstructor.newInstance(actorRef);
return member;
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.cluster.WorkersRefCenter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* Abstract implementation of the {@link akka.actor.UntypedActor} that represents an
* analysis unit. <code>AbstractWorker</code> implementation process the message in
* {@link #receive(Object)} method.
* <p>
* <p>
* Subclasses must implement the abstract {@link #receive(Object)} method to process message.
* Subclasses forbid to override the {@link #onReceive(Object)} method.
* <p>
* Here is an example on how to create and use an {@link AbstractWorker}:
* <p>
* {{{
* public class SampleWorker extends AbstractWorker {
*
* @author pengys5
* @Override public void receive(Object message) throws Throwable {
* if (message.equals("Tell Next")) {
* Object sendMessage = new Object();
* tell(new NextSampleWorkerFactory(), RollingSelector.INSTANCE, sendMessage);
* }
* }
* }
* }}}
*/
public abstract class AbstractWorker extends UntypedActor {
private Logger logger = LogManager.getFormatterLogger(AbstractWorker.class);
private Cluster cluster = Cluster.get(getContext().system());
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.MemberUp.class);
}
@Override
public void postStop() throws Exception {
cluster.unsubscribe(getSelf());
}
/**
* Receive the message to analyse.
*
* @param message is the data send from the forward worker
* @throws Throwable is the exception thrown by that worker implementation processing
*/
public abstract void receive(Object message) throws Throwable;
/**
* Listening {@link ClusterEvent.MemberUp} and {@link ClusterEvent.CurrentClusterState}
* cluster event, when event send from the member of {@link WorkersListener} then tell
* the sender to register self.
*/
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
receive(message);
}
}
/**
* Send analysed data to next Worker.
*
* @param targetWorkerProvider is the worker provider to create worker instance.
* @param selector is the selector to select a same role worker instance form cluster.
* @param message is the data used to send to next worker.
* @throws Throwable
*/
public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Throwable {
List<WorkerRef> availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName());
selector.select(availableWorks, message).tell(message, getSelf());
}
public void tell(AbstractMember targetMember, Object message) throws Exception {
targetMember.beTold(message);
}
/**
* When member role is {@link WorkersListener#WorkName} then Select actor from context
* and send register message to {@link WorkersListener}
*
* @param member is the new created or restart worker
*/
void register(Member member) {
if (member.hasRole(WorkersListener.WorkName)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(getClass().getSimpleName());
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
}
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The <code>AbstractWorkerProvider</code> should be implemented by any class whose
* instances are intended to provide create instance of the {@link AbstractWorker}.
* The {@link WorkersCreator} use java service loader to load provider implementer,
* so you should config the service file.
* <p>
* Here is an example on how to create and use an {@link AbstractWorkerProvider}:
* <p>
* {{{
* public class SampleWorkerFactory extends AbstractWorkerProvider {
*
* @author pengys5
* @Override public Class workerClass() {
* return SampleWorker.class;
* }
* @Override public int workerNum() {
* return Config.SampleWorkerNum;
* }
* }
* }}}
* <p>
*/
public abstract class AbstractWorkerProvider {
private Logger logger = LogManager.getFormatterLogger(AbstractWorkerProvider.class);
public abstract Class workerClass();
public abstract int workerNum();
public void createWorker(ActorSystem system) {
if (workerClass() == null) {
throw new IllegalArgumentException("cannot createInstance() with nothing obtained from workerClass()");
}
if (workerNum() <= 0) {
throw new IllegalArgumentException("cannot createInstance() with obtained from workerNum() must greater than 0");
}
for (int i = 1; i <= workerNum(); i++) {
system.actorOf(Props.create(workerClass()), roleName() + "_" + i);
logger.info("create akka actor, actor id is %s", roleName() + "_" + i);
}
}
/**
* Use {@link #workerClass()} method returned class's simple name as a role name.
*
* @return is role of Worker
*/
protected String roleName() {
return workerClass().getSimpleName();
}
}
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public class CollectorBootstrap {
public static void main(String[] args) {
// ActorSystem system = ActorSystem.create("ClusterSystem", config);
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The Worker reference
*
* @author pengys5
*/
public class WorkerRef {
private Logger logger = LogManager.getFormatterLogger(WorkerRef.class);
final ActorRef actorRef;
final String workerRole;
public WorkerRef(ActorRef actorRef, String workerRole) {
this.actorRef = actorRef;
this.workerRole = workerRole;
}
void tell(Object message, ActorRef sender) {
logger.debug("tell %s worker", actorRef.toString());
actorRef.tell(message, sender);
}
public ActorPath path() {
return actorRef.path();
}
public String getWorkerRole() {
return workerRole;
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ServiceLoader;
/**
* <code>WorkersCreator</code> is a util that use Java Spi to create
* workers by META-INF config file.
*
* @author pengys5
*/
public enum WorkersCreator {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(WorkersCreator.class);
/**
* create worker to use Java Spi.
*
* @param system is create by akka {@link ActorSystem}
*/
public void boot(ActorSystem system) {
system.actorOf(Props.create(WorkersListener.class), WorkersListener.WorkName);
ServiceLoader<AbstractWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractWorkerProvider.class);
for (AbstractWorkerProvider provider : clusterServiceLoader) {
logger.info("create worker {%s} using java service loader", provider.workerClass().getName());
provider.createWorker(system);
}
}
}
package com.a.eye.skywalking.collector.actor.selector;
/**
* @author pengys5
*/
public abstract class AbstractHashMessage {
private int hashCode;
public AbstractHashMessage(String key) {
this.hashCode = key.hashCode();
}
protected int getHashCode() {
return hashCode;
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} by message hashcode, so it can use to send the same hashcode
* message to same {@link WorkerRef}. Usually, use to database operate which avoid dirty data.
*
* @author pengys5
*/
public enum HashCodeSelector implements WorkerSelector {
INSTANCE;
/**
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
AbstractHashMessage hashMessage = (AbstractHashMessage) message;
int size = members.size();
int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return members.get(selectIndex);
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} nearly random, by round-robin.
*
* @author wusheng
*/
public enum RollingSelector implements WorkerSelector {
INSTANCE;
/**
* A simple round variable.
*/
private int index = 0;
/**
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
int size = members.size();
index++;
int selectIndex = Math.abs(index) % size;
return members.get(selectIndex);
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>WorkerSelector</code> should be implemented
* by any class whose instances are intended to provide select a {@link WorkerRef} from a {@link WorkerRef} list.
* <p></p>
* Actually, the <code>WorkerRef</code> is designed to provide a routing ability in the collector cluster.
*
* @author wusheng
*/
public interface WorkerSelector {
/**
* select a {@link WorkerRef} from a {@link WorkerRef} list.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
WorkerRef select(List<WorkerRef> members, Object message);
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorSystem;
/**
* A static class contains some config values of cluster.
* {@link Cluster.Current#hostname} is a ip address of server which start this process.
* {@link Cluster.Current#port} is a port of server use to bind
* {@link Cluster.Current#roles} is a roles of workers that use to create workers which
* has those role in this process.
* {@link Cluster#nodes} is a nodes which cluster have.
* {@link Cluster#appname} is a name of {@link ActorSystem} in cluster.
*
* @author pengys5
*/
public class ClusterConfig {
public static class Cluster {
public static class Current {
public static String hostname = "127.0.0.1";
public static String port = "2551";
public static String roles = "";
}
public static String nodes = "127.0.0.1:2551";
public static final String appname = "CollectorSystem";
public static final String provider = "akka.cluster.ClusterActorRefProvider";
}
}
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.api.util.ConfigInitializer;
import com.a.eye.skywalking.api.util.StringUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.util.Properties;
/**
* <code>ClusterConfigInitializer</code> Contains static methods for setting
* {@link ClusterConfig} attributes value.
*
* <p>
* The priority of value setting is
* system property -> collector.config -> {@link ClusterConfig} default value
* <p>
*
* @author pengys5
*/
public class ClusterConfigInitializer {
private static Logger logger = LogManager.getFormatterLogger(ClusterConfigInitializer.class);
public static final String ConfigFileName = "collector.config";
/**
* Read config file to setting {@link ClusterConfig} then get system property to overwrite it.
*
* @param configFileName is the config file name, the file format is key-value pairs
*/
public static void initialize(String configFileName) {
InputStream configFileStream = ClusterConfigInitializer.class.getResourceAsStream("/" + configFileName);
if (configFileStream == null) {
logger.info("Not provide sky-walking certification documents, sky-walking api run in default config.");
} else {
try {
Properties properties = new Properties();
properties.load(configFileStream);
ConfigInitializer.initialize(properties, ClusterConfig.class);
} catch (Exception e) {
logger.error("Failed to read the config file, sky-walking api run in default config.", e);
}
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.hostname"))) {
ClusterConfig.Cluster.Current.hostname = System.getProperty("cluster.current.hostname");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.port"))) {
ClusterConfig.Cluster.Current.port = System.getProperty("cluster.current.port");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.roles"))) {
ClusterConfig.Cluster.Current.roles = System.getProperty("cluster.current.roles");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.nodes"))) {
ClusterConfig.Cluster.nodes = System.getProperty("cluster.nodes");
}
}
}
package com.a.eye.skywalking.collector.cluster;
/**
* The <code>NoAvailableWorkerException</code> represents no available member,
* when the {@link WorkersRefCenter#availableWorks(String)} try to get the list.
*
* Most likely, in the cluster, these is no active worker of the particular role.
*
* @author wusheng
*/
public class NoAvailableWorkerException extends Exception {
public NoAvailableWorkerException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.cluster;
import java.io.Serializable;
/**
* <code>WorkerListenerMessage</code> is a message just for the worker
* implementation of the {@link com.a.eye.skywalking.collector.actor.AbstractWorker}
* to register.
*
* @author pengys5
*/
public class WorkerListenerMessage {
public static class RegisterMessage implements Serializable {
public final String workRole;
public RegisterMessage(String workRole) {
this.workRole = workRole;
}
public String getWorkRole() {
return workRole;
}
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* <code>WorkersListener</code> listening the register message from workers
* implementation of the {@link com.a.eye.skywalking.collector.actor.AbstractWorker}
* and terminated message from akka cluster.
* <p>
* when listened register message then begin to watch the state for this worker
* and register to {@link WorkersRefCenter}.
* <p>
* when listened terminate message then unregister from {@link WorkersRefCenter}.
*
* @author pengys5
*/
public class WorkersListener extends UntypedActor {
private Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
public static final String WorkName = "WorkersListener";
private Cluster cluster = Cluster.get(getContext().system());
@Override
public void preStart() throws Exception {
cluster.subscribe(getSelf(), ClusterEvent.UnreachableMember.class);
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
ActorRef sender = getSender();
logger.info("register worker of role: %s, path: %s", register.getWorkRole(), sender.toString());
WorkersRefCenter.INSTANCE.register(sender, register.getWorkRole());
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
WorkersRefCenter.INSTANCE.unregister(terminated.getActor());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember) message;
WorkersRefCenter.INSTANCE.unregister(unreachableMember.member().address());
} else {
unhandled(message);
}
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.Address;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* <code>WorkersRefCenter</code> represent a cache center,
* store all {@link ActorRef}s, each of them represent a Akka Actor instance.
* All the Actors in this JVM, can find alive-actor in here, and send message.
*
* @author wusheng
*/
public enum WorkersRefCenter {
INSTANCE;
private Map<String, List<WorkerRef>> roleToWorkerRef = new ConcurrentHashMap();
private Map<ActorRef, WorkerRef> actorRefToWorkerRef = new ConcurrentHashMap<>();
public void register(ActorRef newActorRef, String workerRole) {
if (!roleToWorkerRef.containsKey(workerRole)) {
List<WorkerRef> actorList = Collections.synchronizedList(new ArrayList<WorkerRef>());
roleToWorkerRef.putIfAbsent(workerRole, actorList);
}
WorkerRef newWorkerRef = new WorkerRef(newActorRef, workerRole);
roleToWorkerRef.get(workerRole).add(newWorkerRef);
actorRefToWorkerRef.put(newActorRef, newWorkerRef);
}
public void unregister(ActorRef oldActorRef) {
WorkerRef oldWorkerRef = actorRefToWorkerRef.get(oldActorRef);
roleToWorkerRef.get(oldWorkerRef.getWorkerRole()).remove(oldWorkerRef);
actorRefToWorkerRef.remove(oldActorRef);
}
public void unregister(Address address) {
Iterator<ActorRef> actorRefToWorkerRefIterator = actorRefToWorkerRef.keySet().iterator();
while (actorRefToWorkerRefIterator.hasNext()) {
if (address.equals(actorRefToWorkerRefIterator.next().path().address())) {
actorRefToWorkerRefIterator.remove();
}
}
Iterator<Map.Entry<String, List<WorkerRef>>> roleToWorkerRefIterator = roleToWorkerRef.entrySet().iterator();
while (roleToWorkerRefIterator.hasNext()) {
List<WorkerRef> workerRefList = roleToWorkerRefIterator.next().getValue();
Iterator<WorkerRef> workerRefIterator = workerRefList.iterator();
while (workerRefIterator.hasNext()) {
if (workerRefIterator.next().path().address().equals(address)) {
workerRefIterator.remove();
}
}
}
}
/**
* Get all available {@link WorkerRef} list, by the given worker role.
*
* @param workerRole the given role
* @return available {@link WorkerRef} list
* @throws NoAvailableWorkerException , when no available worker.
*/
public List<WorkerRef> availableWorks(String workerRole) throws NoAvailableWorkerException {
List<WorkerRef> refs = roleToWorkerRef.get(workerRole);
if (refs == null || refs.size() == 0) {
throw new NoAvailableWorkerException("role=" + workerRole + ", no available worker.");
}
return Collections.unmodifiableList(refs);
}
}
package com.a.eye.skywalking.collector.queue;
import java.util.concurrent.ThreadFactory;
/**
* @author pengys5
*/
public enum DaemonThreadFactory implements ThreadFactory {
INSTANCE;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
package com.a.eye.skywalking.collector.queue;
/**
* @author pengys5
*/
public class EndOfBatchCommand {
}
package com.a.eye.skywalking.collector.queue;
/**
* @author pengys5
*/
public class MessageHolder {
private Object message;
public Object getMessage() {
return message;
}
public void setMessage(Object message) {
this.message = message;
}
public void reset() {
message = null;
}
}
package com.a.eye.skywalking.collector.queue;
import com.lmax.disruptor.EventFactory;
/**
* @author pengys5
*/
public class MessageHolderFactory implements EventFactory<MessageHolder> {
public static MessageHolderFactory INSTANCE = new MessageHolderFactory();
public MessageHolder newInstance() {
return new MessageHolder();
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class AbstractWorkerProviderTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
}
@Test(expected = IllegalArgumentException.class)
public void testCreateWorkerWhenWorkerClassIsNull() {
AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() {
@Override
public Class workerClass() {
return Object.class;
}
@Override
public int workerNum() {
return 1;
}
};
aWorkerProvider.createWorker(system);
}
@Test(expected = IllegalArgumentException.class)
public void testCreateWorkerWhenWorkerNumLessThan_1() {
AbstractWorkerProvider aWorkerProvider = new AbstractWorkerProvider() {
@Override
public Class workerClass() {
return null;
}
@Override
public int workerNum() {
return 0;
}
};
aWorkerProvider.createWorker(system);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
/**
* @author pengys5
*/
public class SpiTestWorker extends AbstractWorker {
@Override
public void receive(Object message) throws Throwable {
if (message.equals("Test1")) {
getSender().tell("Yes", getSelf());
} else if (message.equals("Test2")) {
getSender().tell("No", getSelf());
} else if (message.equals("Test3")) {
Object sendMessage = new Object();
tell(new SpiTestWorkerFactory(), RollingSelector.INSTANCE, sendMessage);
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public class SpiTestWorkerFactory extends AbstractWorkerProvider {
@Override
public Class workerClass() {
return SpiTestWorker.class;
}
@Override
public int workerNum() {
return 2;
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class SpiTestWorkerFactoryTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
}
@Test
public void testCreateWorker() {
new JavaTestKit(system) {{
SpiTestWorkerFactory aWorkerProvider = new SpiTestWorkerFactory();
aWorkerProvider.createWorker(system);
system.actorSelection("/user/" + aWorkerProvider.roleName() + "_1").tell("Test1", getRef());
expectMsgEquals(duration("1 second"), "Yes");
system.actorSelection("/user/" + aWorkerProvider.roleName() + "_2").tell("Test2", getRef());
expectMsgEquals(duration("1 second"), "No");
}};
}
}
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class WorkersCreatorTestCase {
ActorSystem system;
@Before
public void createSystem() {
system = ActorSystem.create();
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
}
@Test
public void testBoot() {
new JavaTestKit(system) {{
WorkersCreator.INSTANCE.boot(system);
system.actorSelection("/user/SpiTestWorker_1").tell("Test1", getRef());
expectMsgEquals(duration("1 second"), "Yes");
system.actorSelection("/user/SpiTestWorker_2").tell("Test2", getRef());
expectMsgEquals(duration("1 second"), "No");
}};
}
}
package com.a.eye.skywalking.collector.cluster;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class CollectorConfigTestCase {
@Before
public void resetArguments() {
System.clearProperty("cluster.current.hostname");
System.clearProperty("cluster.current.port");
System.clearProperty("cluster.current.roles");
System.clearProperty("cluster.nodes");
}
@Test
public void testInitializeUseConfigFile() {
ClusterConfigInitializer.initialize("collector.config");
Assert.assertEquals("192.168.0.1", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("[Test, Test1]", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("[192.168.0.1:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes);
}
@Test
public void testInitializeUseArguments() {
System.setProperty("cluster.current.hostname", "192.168.0.2");
System.setProperty("cluster.current.port", "1001");
System.setProperty("cluster.current.roles", "Test3, Test4");
System.setProperty("cluster.nodes", "[192.168.0.2:1000, 192.168.0.2:1000]");
ClusterConfigInitializer.initialize("collector.config");
Assert.assertEquals("192.168.0.2", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("Test3, Test4", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("[192.168.0.2:1000, 192.168.0.2:1000]", ClusterConfig.Cluster.nodes);
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.pattern.Patterns;
import akka.testkit.TestActorRef;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import scala.concurrent.Future;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class WorkerListenerTestCase {
ActorSystem system;
TestActorRef<WorkersListener> senderActorRef;
TestActorRef<WorkersListener> receiveactorRef;
@Before
public void initData() {
system = ActorSystem.create();
final Props props = Props.create(WorkersListener.class);
senderActorRef = TestActorRef.create(system, props, "WorkersListenerSender");
receiveactorRef = TestActorRef.create(system, props, "WorkersListenerReceive");
WorkerListenerMessage.RegisterMessage message = new WorkerListenerMessage.RegisterMessage("WorkersListener");
receiveactorRef.tell(message, senderActorRef);
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
}
@Test
public void testRegister() throws IllegalAccessException {
Map<ActorRef, WorkerRef> actorRefToWorkerRef = (Map<ActorRef, WorkerRef>) MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").get(WorkersRefCenter.INSTANCE);
ActorRef senderRefInWorkerRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(actorRefToWorkerRef.get(senderActorRef));
Assert.assertEquals(senderActorRef, senderRefInWorkerRef);
Map<String, List<WorkerRef>> roleToWorkerRef = (Map<String, List<WorkerRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").get(WorkersRefCenter.INSTANCE);
WorkerRef workerRef = roleToWorkerRef.get("WorkersListener").get(0);
senderRefInWorkerRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRef);
Assert.assertEquals(senderActorRef, senderRefInWorkerRef);
}
}
package com.a.eye.skywalking.collector.cluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestActorRef;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public class WorkersRefCenterTestCase {
ActorSystem system;
TestActorRef<WorkersListener> actorRef1;
TestActorRef<WorkersListener> actorRef2;
TestActorRef<WorkersListener> actorRef3;
@Before
public void createSystem() {
system = ActorSystem.create();
final Props props = Props.create(WorkersListener.class);
actorRef1 = TestActorRef.create(system, props, "WorkersListener1");
actorRef2 = TestActorRef.create(system, props, "WorkersListener2");
actorRef3 = TestActorRef.create(system, props, "WorkersListener3");
WorkersRefCenter.INSTANCE.register(actorRef1, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef2, "WorkersListener");
WorkersRefCenter.INSTANCE.register(actorRef3, "WorkersListener");
}
@After
public void terminateSystem() throws IllegalAccessException {
system.terminate();
system.awaitTermination();
system = null;
MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").set(WorkersRefCenter.INSTANCE, new ConcurrentHashMap());
}
@Test
public void testRegister() throws IllegalAccessException {
Map<String, List<WorkerRef>> roleToActor = (Map<String, List<WorkerRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").get(WorkersRefCenter.INSTANCE);
List<WorkerRef> workerRefs = roleToActor.get("WorkersListener");
ActorRef actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRefs.get(0));
Assert.assertEquals(actorRef1, actorRef);
actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRefs.get(1));
Assert.assertEquals(actorRef2, actorRef);
actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(workerRefs.get(2));
Assert.assertEquals(actorRef3, actorRef);
Map<ActorRef, WorkerRef> actorToRole = (Map<ActorRef, WorkerRef>) MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").get(WorkersRefCenter.INSTANCE);
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef1).getWorkerRole());
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef2).getWorkerRole());
Assert.assertEquals("WorkersListener", actorToRole.get(actorRef3).getWorkerRole());
}
@Test
public void testUnRegister() throws IllegalAccessException {
WorkersRefCenter.INSTANCE.unregister(actorRef1);
Map<String, List<WorkerRef>> roleToWorkerRef = (Map<String, List<WorkerRef>>) MemberModifier.field(WorkersRefCenter.class, "roleToWorkerRef").get(WorkersRefCenter.INSTANCE);
ActorRef actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(roleToWorkerRef.get("WorkersListener").get(0));
Assert.assertEquals(actorRef2, actorRef);
actorRef = (ActorRef) MemberModifier.field(WorkerRef.class, "actorRef").get(roleToWorkerRef.get("WorkersListener").get(1));
Assert.assertEquals(actorRef3, actorRef);
Map<ActorRef, WorkerRef> actorRefToWorkerRef = (Map<ActorRef, WorkerRef>) MemberModifier.field(WorkersRefCenter.class, "actorRefToWorkerRef").get(WorkersRefCenter.INSTANCE);
Assert.assertEquals(null, actorRefToWorkerRef.get(actorRef1));
}
@Test
public void testSizeOf() throws NoAvailableWorkerException {
Assert.assertEquals(3, WorkersRefCenter.INSTANCE.availableWorks("WorkersListener").size());
}
}
package com.a.eye.skywalking.collector.queue;
import org.junit.Test;
/**
* @author pengys5
*/
public class QueueTestCase {
@Test
public void testProducer() throws InterruptedException {
}
}
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
// "java.io.Serializable" = none
}
// serialize-messages = on
warn-about-java-serializer-usage = on
}
}
\ No newline at end of file
cluster.current.hostname = 192.168.0.1
cluster.current.port = 1000
cluster.current.roles = [Test, Test1]
cluster.nodes = [192.168.0.1:1000, 192.168.0.2:1000]
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n" />
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>skywalking-collector</artifactId>
<groupId>com.a.eye</groupId>
<version>3.0-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-collector-worker</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.2</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-sniffer-mock</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMember;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class AnalysisMember extends AbstractAsyncMember {
private Logger logger = LogManager.getFormatterLogger(AnalysisMember.class);
public AnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public abstract void analyse(Object message) throws Exception;
@Override
public void receive(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
analyse(message);
}
}
protected abstract void aggregation() throws Exception;
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.actor.WorkersCreator;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.NoAvailableWorkerException;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.net.UnknownHostException;
/**
* @author pengys5
*/
public class CollectorBootStartUp {
public static void main(String[] args) throws NoAvailableWorkerException, InterruptedException, UnknownHostException {
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.actor.provider=" + ClusterConfig.Cluster.provider)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)).
withFallback(ConfigFactory.load("application.conf"));
ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
WorkersCreator.INSTANCE.boot(system);
EsClient.boot();
}
}
package com.a.eye.skywalking.collector.worker;
import akka.serialization.JSerializer;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* @author pengys5
*/
public class JsonSerializer extends JSerializer {
@Override
public boolean includeManifest() {
return false;
}
@Override
public int identifier() {
return 123;
}
@Override
public byte[] toBinary(Object o) {
// System.out.println("Json toBinary");
JsonObject jsonObject = (JsonObject) o;
return jsonObject.toString().getBytes();
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
// System.out.println("Json fromBinaryJava");
Gson gson = new Gson();
return gson.fromJson(new String(bytes), JsonObject.class);
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class MetricAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(MetricAnalysisMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricAnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public void setMetric(String id, int second, Long value) throws Exception {
persistenceData.getElseCreate(id).setMetric(second, value);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
aggregation();
}
}
public MetricData pushOne() {
if (persistenceData.iterator().hasNext()) {
return persistenceData.pushOne();
}
return null;
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.Client;
import java.util.Iterator;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class MetricPersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(MetricPersistenceMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricPersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof MetricData) {
MetricData metricData = (MetricData) message;
persistenceData.getElseCreate(metricData.getId()).merge(metricData);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
} else {
logger.error("message unhandled");
}
}
protected void persistence() {
MultiGetResponse multiGetResponse = searchFromEs();
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
if (response != null && response.isExists()) {
persistenceData.getElseCreate(response.getId()).merge(response.getSource());
}
}
boolean success = saveToEs();
if (success) {
persistenceData.clear();
}
}
public MultiGetResponse searchFromEs() {
Client client = EsClient.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
Iterator<Map.Entry<String, MetricData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
multiGetRequestBuilder.add(esIndex(), esType(), iterator.next().getKey());
}
MultiGetResponse multiGetResponse = multiGetRequestBuilder.get();
return multiGetResponse;
}
public boolean saveToEs() {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
Iterator<Map.Entry<String, MetricData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
MetricData metricData = iterator.next().getValue();
bulkRequest.add(client.prepareIndex(esIndex(), esType(), metricData.getId()).setSource(metricData.toMap()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return !bulkResponse.hasFailures();
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMember;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class PersistenceMember extends AbstractAsyncMember {
private Logger logger = LogManager.getFormatterLogger(PersistenceMember.class);
public PersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public abstract String esIndex();
public abstract String esType();
public abstract void analyse(Object message) throws Exception;
@Override
public void receive(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
persistence();
} else {
analyse(message);
}
}
protected abstract void persistence();
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class RecordAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(RecordAnalysisMember.class);
private RecordPersistenceData persistenceData = new RecordPersistenceData();
public RecordAnalysisMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
public void setRecord(String id, JsonObject record) throws Exception {
persistenceData.getElseCreate(id).setRecord(record);
if (persistenceData.size() >= WorkerConfig.Analysis.Data.size) {
aggregation();
}
}
public RecordData pushOne() {
if (persistenceData.hasNext()) {
return persistenceData.pushOne();
}
return null;
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import java.util.Iterator;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class RecordPersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(RecordPersistenceMember.class);
protected RecordPersistenceData persistenceData = new RecordPersistenceData();
public RecordPersistenceMember(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof RecordData) {
RecordData recordData = (RecordData) message;
persistenceData.getElseCreate(recordData.getId()).setRecord(recordData.getRecord());
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
} else {
logger.error("message unhandled");
}
}
protected void persistence() {
boolean success = saveToEs();
if (success) {
persistenceData.clear();
}
}
public boolean saveToEs() {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
Iterator<Map.Entry<String, RecordData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
Map.Entry<String, RecordData> recordData = iterator.next();
bulkRequest.add(client.prepareIndex(esIndex(), esType(), recordData.getKey()).setSource(recordData.getValue().getRecord().toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return !bulkResponse.hasFailures();
}
}
package com.a.eye.skywalking.collector.worker;
import akka.serialization.JSerializer;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* @author pengys5
*/
public class TraceSegmentSerializer extends JSerializer {
@Override
public boolean includeManifest() {
return false;
}
@Override
public int identifier() {
return 0;
}
@Override
public byte[] toBinary(Object o) {
// System.out.println("toBinary");
TraceSegment traceSegment = (TraceSegment) o;
return traceSegment.serialize().toByteArray();
}
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
// System.out.println("fromBinaryJava");
TraceSegment traceSegment = null;
try {
traceSegment = new TraceSegment(SegmentMessage.parseFrom(bytes));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return traceSegment;
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
/**
* @author pengys5
*/
public class WorkerConfig extends ClusterConfig {
public static class Analysis {
public static class Data {
public static int size = 1000;
}
}
public static class Persistence {
public static class Data {
public static int size = 1000;
}
}
public static class Worker {
public static class TraceSegmentReceiver {
public static int Num = 10;
}
public static class DAGNodeReceiver {
public static int Num = 10;
}
public static class NodeInstanceReceiver {
public static int Num = 10;
}
public static class ResponseCostReceiver {
public static int Num = 10;
}
public static class ResponseSummaryReceiver {
public static int Num = 10;
}
public static class DAGNodeRefReceiver {
public static int Num = 10;
}
}
public static class Queue {
public static class Persistence {
public static class DAGNodePersistence {
public static int Size = 1024;
}
public static class NodeInstancePersistence {
public static int Size = 1024;
}
public static class ResponseCostPersistence {
public static int Size = 1024;
}
public static class ResponseSummaryPersistence {
public static int Size = 1024;
}
public static class DAGNodeRefPersistence {
public static int Size = 1024;
}
}
public static class TraceSegmentRecordAnalysis {
public static int Size = 1024;
}
public static class NodeInstanceAnalysis {
public static int Size = 1024;
}
public static class DAGNodeAnalysis {
public static int Size = 1024;
}
public static class ResponseCostAnalysis {
public static int Size = 1024;
}
public static class ResponseSummaryAnalysis {
public static int Size = 1024;
}
public static class DAGNodeRefAnalysis {
public static int Size = 1024;
}
}
}
package com.a.eye.skywalking.collector.worker.application;
import akka.actor.ActorRef;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.AbstractSyncMember;
import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider;
import com.a.eye.skywalking.collector.worker.application.analysis.DAGNodeAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.NodeInstanceAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseCostAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseSummaryAnalysis;
import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.a.eye.skywalking.trace.tag.Tags;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ApplicationMain extends AbstractSyncMember {
private Logger logger = LogManager.getFormatterLogger(ApplicationMain.class);
private DAGNodeAnalysis dagNodeAnalysis;
private NodeInstanceAnalysis nodeInstanceAnalysis;
private ResponseCostAnalysis responseCostAnalysis;
private ResponseSummaryAnalysis responseSummaryAnalysis;
private TraceSegmentRecordPersistence recordPersistence;
public ApplicationMain(ActorRef actorRef) throws Exception {
super(actorRef);
dagNodeAnalysis = DAGNodeAnalysis.Factory.INSTANCE.createWorker(actorRef);
nodeInstanceAnalysis = NodeInstanceAnalysis.Factory.INSTANCE.createWorker(actorRef);
responseCostAnalysis = ResponseCostAnalysis.Factory.INSTANCE.createWorker(actorRef);
responseSummaryAnalysis = ResponseSummaryAnalysis.Factory.INSTANCE.createWorker(actorRef);
recordPersistence = TraceSegmentRecordPersistence.Factory.INSTANCE.createWorker(actorRef);
}
@Override
public void receive(Object message) throws Exception {
if (message instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
logger.debug("begin translate TraceSegment Object to JsonObject");
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
recordPersistence.beTold(traceSegment);
sendToDAGNodePersistence(traceSegment);
sendToNodeInstanceAnalysis(traceSegment);
sendToResponseCostPersistence(traceSegment);
sendToResponseSummaryPersistence(traceSegment);
}
}
public static class Factory extends AbstractSyncMemberProvider<ApplicationMain> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ApplicationMain.class;
}
}
private void sendToDAGNodePersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
String component = null;
String layer = null;
for (Span span : traceSegment.getTraceSegment().getSpans()) {
if (span.getParentSpanId() == -1) {
component = Tags.COMPONENT.get(span);
layer = Tags.SPAN_LAYER.get(span);
}
}
DAGNodeAnalysis.Metric node = new DAGNodeAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, component, layer);
dagNodeAnalysis.beTold(node);
}
private void sendToNodeInstanceAnalysis(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
TraceSegmentRef traceSegmentRef = traceSegment.getTraceSegment().getPrimaryRef();
if (traceSegmentRef != null && !StringUtil.isEmpty(traceSegmentRef.getApplicationCode())) {
String code = traceSegmentRef.getApplicationCode();
String address = traceSegmentRef.getPeerHost();
NodeInstanceAnalysis.Metric property = new NodeInstanceAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, address);
nodeInstanceAnalysis.beTold(property);
}
}
private void sendToResponseCostPersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
long startTime = -1;
long endTime = -1;
Boolean isError = false;
for (Span span : traceSegment.getTraceSegment().getSpans()) {
if (span.getParentSpanId() == -1) {
startTime = span.getStartTime();
endTime = span.getEndTime();
isError = Tags.ERROR.get(span);
}
}
ResponseCostAnalysis.Metric cost = new ResponseCostAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError, startTime, endTime);
responseCostAnalysis.beTold(cost);
}
private void sendToResponseSummaryPersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
boolean isError = false;
for (Span span : traceSegment.getTraceSegment().getSpans()) {
if (span.getParentSpanId() == -1) {
isError = Tags.ERROR.get(span);
}
}
ResponseSummaryAnalysis.Metric summary = new ResponseSummaryAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError);
responseSummaryAnalysis.beTold(summary);
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeAnalysis.class);
public DAGNodeAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty(DateTools.Time_Slice_Column_Name, metric.getMinute());
propertyJsonObj.addProperty("component", metric.component);
propertyJsonObj.addProperty("layer", metric.layer);
String id = metric.getMinute() + "-" + metric.code;
logger.debug("dag node: %s", propertyJsonObj.toString());
setRecord(id, propertyJsonObj);
} else {
logger.error("message unhandled");
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
tell(DAGNodeReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeAnalysis.class;
}
@Override
public int queueSize() {
return 1024;
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final String component;
private final String layer;
public Metric(long minute, int second, String code, String component, String layer) {
super(minute, second);
this.code = code;
this.component = component;
this.layer = layer;
}
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.NodeInstanceReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstanceAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceAnalysis.class);
public NodeInstanceAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("code", metric.code);
propertyJsonObj.addProperty(DateTools.Time_Slice_Column_Name, metric.getMinute());
propertyJsonObj.addProperty("address", metric.address);
String id = metric.getMinute() + "-" + metric.address;
setRecord(id, propertyJsonObj);
logger.debug("node instance: %s", propertyJsonObj.toString());
} else {
logger.error("message unhandled");
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
tell(NodeInstanceReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<NodeInstanceAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return NodeInstanceAnalysis.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeInstanceAnalysis.Size;
}
}
public static class Metric extends AbstractTimeSlice{
private final String code;
private final String address;
public Metric(long minute, int second, String code, String address) {
super(minute, second);
this.code = code;
this.address = address;
}
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostAnalysis.class);
public ResponseCostAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
long cost = metric.endTime - metric.startTime;
if (cost <= 1000 && !metric.isError) {
String id = metric.getMinute() + "-" + metric.code;
setMetric(id, metric.getSecond(), cost);
}
// logger.debug("response cost metric: %s", data.toString());
}
}
@Override
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
tell(ResponseCostReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneMetric);
}
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseCostAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseCostAnalysis.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseCostAnalysis.Size;
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
private final Long startTime;
private final Long endTime;
public Metric(long minute, int second, String code, Boolean isError, Long startTime, Long endTime) {
super(minute, second);
this.code = code;
this.isError = isError;
this.startTime = startTime;
this.endTime = endTime;
}
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseSummaryAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryAnalysis.class);
public ResponseSummaryAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
String id = metric.getMinute() + "-" + metric.code;
setMetric(id, metric.getSecond(), 1L);
// logger.debug("response summary metric: %s", data.toString());
}
}
@Override
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
tell(ResponseSummaryReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneMetric);
}
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseSummaryAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseSummaryAnalysis.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseSummaryAnalysis.Size;
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
public Metric(long minute, int second, String code, Boolean isError) {
super(minute, second);
this.code = code;
this.isError = isError;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodePersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodePersistence.class);
public DAGNodePersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application";
}
@Override
public String esType() {
return "dag_node";
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodePersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodePersistence.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstancePersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(NodeInstancePersistence.class);
public NodeInstancePersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application";
}
@Override
public String esType() {
return "node_instance";
}
public static class Factory extends AbstractAsyncMemberProvider<NodeInstancePersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return NodeInstancePersistence.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostPersistence extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostPersistence.class);
public ResponseCostPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application_metric";
}
@Override
public String esType() {
return "response_cost";
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseCostPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseCostPersistence.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseCostPersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.MetricPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseSummaryPersistence extends MetricPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryPersistence.class);
public ResponseSummaryPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "application_metric";
}
@Override
public String esType() {
return "response_summary";
}
public static class Factory extends AbstractAsyncMemberProvider<ResponseSummaryPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ResponseSummaryPersistence.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.ResponseSummaryPersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentRecordPersistence.class);
@Override
public String esIndex() {
return "application_record";
}
@Override
public String esType() {
return "trace_segment";
}
public TraceSegmentRecordPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
JsonObject jsonObject = parseTraceSegment(traceSegment.getTraceSegment(), traceSegment.getMinute());
RecordData recordData = new RecordData(traceSegment.getTraceSegment().getTraceSegmentId());
recordData.setRecord(jsonObject);
super.analyse(recordData);
}
}
public static class Factory extends AbstractAsyncMemberProvider<TraceSegmentRecordPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
}
@Override
public Class memberClass() {
return TraceSegmentRecordPersistence.class;
}
}
private JsonObject parseTraceSegment(TraceSegment traceSegment, long minute) {
JsonObject traceJsonObj = new JsonObject();
traceJsonObj.addProperty("segmentId", traceSegment.getTraceSegmentId());
traceJsonObj.addProperty(DateTools.Time_Slice_Column_Name, minute);
traceJsonObj.addProperty("startTime", traceSegment.getStartTime());
traceJsonObj.addProperty("endTime", traceSegment.getEndTime());
traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode());
if (traceSegment.getPrimaryRef() != null) {
JsonObject primaryRefJsonObj = parsePrimaryRef(traceSegment.getPrimaryRef());
traceJsonObj.add("primaryRef", primaryRefJsonObj);
}
if (traceSegment.getRefs() != null) {
JsonArray refsJsonArray = parseRefs(traceSegment.getRefs());
traceJsonObj.add("refs", refsJsonArray);
}
JsonArray spanJsonArray = new JsonArray();
for (Span span : traceSegment.getSpans()) {
JsonObject spanJsonObj = parseSpan(span);
spanJsonArray.add(spanJsonObj);
}
traceJsonObj.add("spans", spanJsonArray);
return traceJsonObj;
}
private JsonObject parsePrimaryRef(TraceSegmentRef primaryRef) {
JsonObject primaryRefJsonObj = new JsonObject();
primaryRefJsonObj.addProperty("appCode", primaryRef.getApplicationCode());
primaryRefJsonObj.addProperty("spanId", primaryRef.getSpanId());
primaryRefJsonObj.addProperty("peerHost", primaryRef.getPeerHost());
primaryRefJsonObj.addProperty("segmentId", primaryRef.getTraceSegmentId());
return primaryRefJsonObj;
}
private JsonArray parseRefs(List<TraceSegmentRef> refs) {
JsonArray refsJsonArray = new JsonArray();
for (TraceSegmentRef ref : refs) {
JsonObject refJsonObj = new JsonObject();
refJsonObj.addProperty("spanId", ref.getSpanId());
refJsonObj.addProperty("appCode", ref.getApplicationCode());
refJsonObj.addProperty("segmentId", ref.getTraceSegmentId());
refJsonObj.addProperty("peerHost", ref.getPeerHost());
refsJsonArray.add(refJsonObj);
}
return refsJsonArray;
}
private JsonObject parseSpan(Span span) {
JsonObject spanJsonObj = new JsonObject();
spanJsonObj.addProperty("spanId", span.getSpanId());
spanJsonObj.addProperty("parentSpanId", span.getParentSpanId());
spanJsonObj.addProperty("startTime", span.getStartTime());
spanJsonObj.addProperty("endTime", span.getEndTime());
spanJsonObj.addProperty("operationName", span.getOperationName());
JsonObject tagsJsonObj = parseSpanTag(span.getTags());
spanJsonObj.add("tags", tagsJsonObj);
return spanJsonObj;
}
private JsonObject parseSpanTag(Map<String, Object> tags) {
JsonObject tagsJsonObj = new JsonObject();
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String key = entry.getKey();
String value = String.valueOf(entry.getValue());
tagsJsonObj.addProperty(key, value);
}
return tagsJsonObj;
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeReceiver.class);
private DAGNodePersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = DAGNodePersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof RecordData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return DAGNodeReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.NodeInstancePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class NodeInstanceReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceReceiver.class);
private NodeInstancePersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = NodeInstancePersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof RecordData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return NodeInstanceReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.NodeInstanceReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseCostPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseCostReceiver.class);
private ResponseCostPersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = ResponseCostPersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof MetricData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return ResponseCostReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseCostReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.ResponseSummaryPersistence;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseSummaryReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryReceiver.class);
private ResponseSummaryPersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = ResponseSummaryPersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof MetricData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return ResponseSummaryReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.ResponseSummaryReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
import akka.actor.ActorRef;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.AbstractSyncMember;
import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider;
import com.a.eye.skywalking.collector.worker.applicationref.analysis.DAGNodeRefAnalysis;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.trace.TraceSegmentRef;
/**
* @author pengys5
*/
public class ApplicationRefMain extends AbstractSyncMember {
private DAGNodeRefAnalysis dagNodeRefAnalysis;
public ApplicationRefMain(ActorRef actorRef) throws Throwable {
super(actorRef);
dagNodeRefAnalysis = DAGNodeRefAnalysis.Factory.INSTANCE.createWorker(actorRef);
}
@Override
public void receive(Object message) throws Exception {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
TraceSegmentRef traceSegmentRef = traceSegment.getTraceSegment().getPrimaryRef();
if (traceSegmentRef != null && !StringUtil.isEmpty(traceSegmentRef.getApplicationCode())) {
String front = traceSegmentRef.getApplicationCode();
String behind = traceSegment.getTraceSegment().getApplicationCode();
DAGNodeRefAnalysis.Metric nodeRef = new DAGNodeRefAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), front, behind);
dagNodeRefAnalysis.beTold(nodeRef);
}
}
public static class Factory extends AbstractSyncMemberProvider<ApplicationRefMain> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return ApplicationRefMain.class;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.analysis;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.receiver.DAGNodeRefReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.google.gson.JsonObject;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefAnalysis.class);
public DAGNodeRefAnalysis(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
JsonObject propertyJsonObj = new JsonObject();
propertyJsonObj.addProperty("frontCode", metric.frontCode);
propertyJsonObj.addProperty("behindCode", metric.behindCode);
propertyJsonObj.addProperty(DateTools.Time_Slice_Column_Name, metric.getMinute());
String id = metric.getMinute() + "-" + metric.frontCode + "-" + metric.behindCode;
setRecord(id, propertyJsonObj);
logger.debug("dag node ref: %s", propertyJsonObj.toString());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
tell(DAGNodeRefReceiver.Factory.INSTANCE, HashCodeSelector.INSTANCE, oneRecord);
}
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeRefAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeRefAnalysis.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.DAGNodeRefAnalysis.Size;
}
}
public static class Metric extends AbstractTimeSlice {
private final String frontCode;
private final String behindCode;
public Metric(long minute, int second, String frontCode, String behindCode) {
super(minute, second);
this.frontCode = frontCode;
this.behindCode = behindCode;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.persistence;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefPersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefPersistence.class);
public DAGNodeRefPersistence(RingBuffer<MessageHolder> ringBuffer, ActorRef actorRef) {
super(ringBuffer, actorRef);
}
@Override
public String esIndex() {
return "node_ref";
}
@Override
public String esType() {
return "node_ref";
}
public static class Factory extends AbstractAsyncMemberProvider<DAGNodeRefPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Class memberClass() {
return DAGNodeRefPersistence.class;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.applicationref.persistence.DAGNodeRefPersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeRefReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefReceiver.class);
private DAGNodeRefPersistence persistence;
@Override
public void preStart() throws Exception {
super.preStart();
persistence = DAGNodeRefPersistence.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof RecordData) {
persistence.beTold(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return DAGNodeRefReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.DAGNodeRefReceiver.Num;
}
}
}
package com.a.eye.skywalking.collector.worker.receiver;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.AbstractWorkerProvider;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.ApplicationMain;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMain;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.TraceSegment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class TraceSegmentReceiver extends AbstractWorker {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class);
private ApplicationMain applicationMain;
private ApplicationRefMain applicationRefMain;
public TraceSegmentReceiver() throws Exception {
applicationMain = ApplicationMain.Factory.INSTANCE.createWorker(getSelf());
applicationRefMain = ApplicationRefMain.Factory.INSTANCE.createWorker(getSelf());
}
@Override
public void receive(Object message) throws Throwable {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId());
long timeSlice = DateTools.timeStampToTimeSlice(traceSegment.getStartTime());
int second = DateTools.timeStampToSecond(traceSegment.getStartTime());
TraceSegmentTimeSlice segmentTimeSlice = new TraceSegmentTimeSlice(timeSlice, second, traceSegment);
tell(applicationMain, segmentTimeSlice);
tell(applicationRefMain, segmentTimeSlice);
}
}
public static class Factory extends AbstractWorkerProvider {
public static Factory INSTANCE = new Factory();
@Override
public Class workerClass() {
return TraceSegmentReceiver.class;
}
@Override
public int workerNum() {
return WorkerConfig.Worker.TraceSegmentReceiver.Num;
}
}
public static class TraceSegmentTimeSlice extends AbstractTimeSlice {
private final TraceSegment traceSegment;
public TraceSegmentTimeSlice(long timeSliceMinute, int second, TraceSegment traceSegment) {
super(timeSliceMinute, second);
this.traceSegment = traceSegment;
}
public TraceSegment getTraceSegment() {
return traceSegment;
}
}
}
package com.a.eye.skywalking.collector.worker.storage;
/**
* @author pengys5
*/
public abstract class AbstractTimeSlice {
private final long minute;
private final int second;
public AbstractTimeSlice(long minute, int second) {
this.minute = minute;
this.second = second;
}
public long getMinute() {
return minute;
}
public int getSecond() {
return second;
}
}
package com.a.eye.skywalking.collector.worker.storage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* @author pengys5
*/
public class EsClient {
private static Logger logger = LogManager.getFormatterLogger(EsClient.class);
private static Client client;
public static void boot() throws UnknownHostException {
Settings settings = Settings.builder()
.put("cluster.name", "CollectorCluster")
.put("client.transport.sniff", true).build();
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
}
public static Client getClient() {
return client;
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MetricData extends AbstractHashMessage {
public MetricData(String key) {
super(key);
this.id = key;
}
private String id;
private static final String s10 = "s10";
private static final String s20 = "s20";
private static final String s30 = "s30";
private static final String s40 = "s40";
private static final String s50 = "s50";
private static final String s60 = "s60";
private Long s10Value = 0L;
private Long s20Value = 0L;
private Long s30Value = 0L;
private Long s40Value = 0L;
private Long s50Value = 0L;
private Long s60Value = 0L;
public void setMetric(int second, Long value) {
if (second <= 10) {
s10Value += value;
} else if (second > 10 && second <= 20) {
s20Value += value;
} else if (second > 20 && second <= 30) {
s30Value += value;
} else if (second > 30 && second <= 40) {
s40Value += value;
} else if (second > 40 && second <= 50) {
s50Value += value;
} else {
s60Value += value;
}
}
public void merge(MetricData metricData) {
s10Value += metricData.s10Value;
s20Value += metricData.s20Value;
s30Value += metricData.s30Value;
s40Value += metricData.s40Value;
s50Value += metricData.s50Value;
s60Value += metricData.s60Value;
}
public void merge(Map<String, Object> dbData) {
s10Value += Long.valueOf(dbData.get(s10).toString());
s20Value += Long.valueOf(dbData.get(s20).toString());
s30Value += Long.valueOf(dbData.get(s30).toString());
s40Value += Long.valueOf(dbData.get(s40).toString());
s50Value += Long.valueOf(dbData.get(s50).toString());
s60Value += Long.valueOf(dbData.get(s60).toString());
}
public Map<String, Long> toMap() {
Map<String, Long> map = new HashMap<>();
map.put(s10, s10Value);
map.put(s20, s20Value);
map.put(s30, s30Value);
map.put(s40, s40Value);
map.put(s50, s50Value);
map.put(s60, s60Value);
return map;
}
public String getId() {
return id;
}
protected Long getS10Value() {
return s10Value;
}
protected Long getS20Value() {
return s20Value;
}
protected Long getS30Value() {
return s30Value;
}
protected Long getS40Value() {
return s40Value;
}
protected Long getS50Value() {
return s50Value;
}
protected Long getS60Value() {
return s60Value;
}
}
package com.a.eye.skywalking.collector.worker.storage;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Consumer;
/**
* @author pengys5
*/
public class MetricPersistenceData implements Iterable {
private Map<String, MetricData> persistenceData = new HashMap();
public MetricData getElseCreate(String id) {
if (!persistenceData.containsKey(id)) {
persistenceData.put(id, new MetricData(id));
}
return persistenceData.get(id);
}
public int size() {
return persistenceData.size();
}
public void clear() {
persistenceData.clear();
}
public MetricData pushOne() {
MetricData one = persistenceData.entrySet().iterator().next().getValue();
persistenceData.remove(one.getId());
return one;
}
@Override
public void forEach(Consumer action) {
throw new UnsupportedOperationException("forEach");
}
@Override
public Spliterator spliterator() {
throw new UnsupportedOperationException("spliterator");
}
@Override
public Iterator<Map.Entry<String, MetricData>> iterator() {
return persistenceData.entrySet().iterator();
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage;
import com.google.gson.JsonObject;
/**
* @author pengys5
*/
public class RecordData extends AbstractHashMessage {
private String id;
private JsonObject record;
public RecordData(String key) {
super(key);
this.id = key;
}
public String getId() {
return id;
}
public JsonObject getRecord() {
return record;
}
public void setRecord(JsonObject record) {
this.record = record;
}
}
package com.a.eye.skywalking.collector.worker.storage;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Consumer;
/**
* @author pengys5
*/
public class RecordPersistenceData implements Iterable {
private Map<String, RecordData> persistenceData = new HashMap();
public RecordData getElseCreate(String id) {
if (!persistenceData.containsKey(id)) {
persistenceData.put(id, new RecordData(id));
}
return persistenceData.get(id);
}
public int size() {
return persistenceData.size();
}
public void clear() {
persistenceData.clear();
}
public boolean hasNext() {
return persistenceData.entrySet().iterator().hasNext();
}
public RecordData pushOne() {
RecordData one = persistenceData.entrySet().iterator().next().getValue();
persistenceData.remove(one.getId());
return one;
}
@Override
public void forEach(Consumer action) {
throw new UnsupportedOperationException("forEach");
}
@Override
public Spliterator spliterator() {
throw new UnsupportedOperationException("spliterator");
}
@Override
public Iterator<Map.Entry<String, RecordData>> iterator() {
return persistenceData.entrySet().iterator();
}
}
package com.a.eye.skywalking.collector.worker.tools;
import java.text.SimpleDateFormat;
import java.util.Calendar;
/**
* @author pengys5
*/
public class DateTools {
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
public static final String Time_Slice_Column_Name = "timeSlice";
public static int timeStampToSecond(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
return calendar.get(Calendar.SECOND);
}
public static long timeStampToTimeSlice(long time) {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
String timeStr = sdf.format(calendar.getTime());
return Long.valueOf(timeStr);
}
}
com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.DAGNodeReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.NodeInstanceReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver$Factory
com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver$Factory
com.a.eye.skywalking.collector.worker.applicationref.receiver.DAGNodeRefReceiver$Factory
\ No newline at end of file
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
data = "com.a.eye.skywalking.collector.worker.TraceSegmentSerializer"
json = "com.a.eye.skywalking.collector.worker.JsonSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
"com.a.eye.skywalking.messages.ISerializable" = data
"com.google.gson.JsonObject" = json
// "java.io.Serializable" = none
}
// serialize-messages = on
warn-about-java-serializer-usage = on
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 1000
}
}
cluster {
auto-down-unreachable-after = off
metrics.enabled = off
}
}
\ No newline at end of file
cluster.current.hostname=127.0.0.1
cluster.current.port=1000
cluster.current.roles=[WorkersListener, TraceSegmentReceiver, NodeInstancePersistence]
cluster.nodes=["akka.tcp://CollectorSystem@127.0.0.1:1000", "akka.tcp://CollectorSystem@127.0.0.1:1001", "akka.tcp://CollectorSystem@127.0.0.1:1002"]
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n" />
</Console>
</Appenders>
<Loggers>
<Root level="error">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.a.eye.skywalking.collector.actor.WorkersCreator;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.a.eye.skywalking.trace.proto.SegmentRefMessage;
import com.a.eye.skywalking.trace.tag.Tags;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
/**
* @author pengys5
*/
public class StartUpTestCase {
public void test() throws Exception {
System.out.println(TraceSegmentReceiver.class.getSimpleName());
ClusterConfigInitializer.initialize("collector.config");
System.out.println(ClusterConfig.Cluster.Current.roles);
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.actor.provider=" + ClusterConfig.Cluster.provider)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.nodes)).
withFallback(ConfigFactory.load("application.conf"));
ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
WorkersCreator.INSTANCE.boot(system);
EsClient.boot();
TraceSegment dubboClientData = TraceSegmentBuilderFactory.INSTANCE.traceOf_Tomcat_DubboClient();
SegmentMessage.Builder clientBuilder = dubboClientData.serialize().toBuilder();
clientBuilder.setApplicationCode("Tomcat_DubboClient");
dubboClientData = new TraceSegment(clientBuilder.build());
TraceSegment dubboServerData = TraceSegmentBuilderFactory.INSTANCE.traceOf_DubboServer_MySQL();
SegmentMessage serializeServer = dubboServerData.serialize();
SegmentMessage.Builder builder = serializeServer.toBuilder();
SegmentRefMessage.Builder builderRef = builder.getPrimaryRef().toBuilder();
builderRef.setApplicationCode(dubboClientData.getApplicationCode());
builderRef.setPeerHost(Tags.PEER_HOST.get(dubboClientData.getSpans().get(1)));
builder.setApplicationCode("DubboServer_MySQL");
builder.setPrimaryRef(builderRef);
dubboServerData = new TraceSegment(builder.build());
Thread.sleep(5000);
ActorSelection selection = system.actorSelection("/user/TraceSegmentReceiver_1");
for (int i = 0; i < 100; i++) {
selection.tell(dubboClientData, ActorRef.noSender());
selection.tell(dubboServerData, ActorRef.noSender());
Thread.sleep(200);
}
Thread.sleep(1000000);
}
}
......@@ -270,6 +270,7 @@ public class Span implements ISerializable<SpanMessage> {
public SpanMessage serialize() {
SpanMessage.Builder builder = SpanMessage.newBuilder();
builder.setSpanId(spanId);
builder.setParentSpanId(parentSpanId);
builder.setStartTime(startTime);
builder.setEndTime(endTime);
builder.setOperationName(operationName);
......@@ -292,6 +293,7 @@ public class Span implements ISerializable<SpanMessage> {
@Override
public void deserialize(SpanMessage message) {
spanId = message.getSpanId();
parentSpanId = message.getParentSpanId();
startTime = message.getStartTime();
endTime = message.getEndTime();
operationName = message.getOperationName();
......
......@@ -189,9 +189,13 @@ public class TraceSegment implements ISerializable<SegmentMessage> {
segmentBuilder.setStartTime(startTime);
segmentBuilder.setEndTime(endTime);
segmentBuilder.setApplicationCode(applicationCode);
segmentBuilder.setPrimaryRef(primaryRef.serialize());
for (TraceSegmentRef ref : refs) {
segmentBuilder.addRefs(ref.serialize());
if(primaryRef != null) {
segmentBuilder.setPrimaryRef(primaryRef.serialize());
}
if(refs != null && refs.size() > 0) {
for (TraceSegmentRef ref : refs) {
segmentBuilder.addRefs(ref.serialize());
}
}
for (Span span : spans) {
segmentBuilder.addSpans(span.serialize());
......@@ -205,9 +209,12 @@ public class TraceSegment implements ISerializable<SegmentMessage> {
startTime = message.getStartTime();
endTime = message.getEndTime();
applicationCode = message.getApplicationCode();
(primaryRef = new TraceSegmentRef()).deserialize(message.getPrimaryRef());
SegmentRefMessage messagePrimaryRef = message.getPrimaryRef();
if(messagePrimaryRef != null) {
(primaryRef = new TraceSegmentRef()).deserialize(messagePrimaryRef);
}
List<SegmentRefMessage> refsList = message.getRefsList();
if (refsList != null) {
if (refsList != null && refsList.size() > 0) {
this.refs = new LinkedList<TraceSegmentRef>();
for (SegmentRefMessage refMessage : refsList) {
TraceSegmentRef ref = new TraceSegmentRef();
......
......@@ -4,12 +4,16 @@ import com.a.eye.skywalking.trace.Span;
/**
* Do the same thing as {@link StringTag}, just with a {@link Boolean} value.
*
* <p>
* Created by wusheng on 2017/2/17.
*/
public class BooleanTag extends AbstractTag<Boolean> {
public BooleanTag(String key) {
private boolean defaultValue;
public BooleanTag(String key, boolean defaultValue) {
super(key);
this.defaultValue = defaultValue;
}
@Override
......@@ -28,9 +32,9 @@ public class BooleanTag extends AbstractTag<Boolean> {
public Boolean get(Span span) {
Object tagValue = span.getTag(super.key);
if (tagValue == null) {
return null;
return defaultValue;
} else if (tagValue instanceof Boolean) {
return (Boolean)tagValue;
return (Boolean) tagValue;
} else {
return Boolean.valueOf(tagValue.toString());
}
......
......@@ -5,7 +5,7 @@ import com.a.eye.skywalking.trace.Span;
/**
* The span tags are supported by sky-walking engine.
* As default, all tags will be stored, but these ones have particular meanings.
*
* <p>
* Created by wusheng on 2017/2/17.
*/
public final class Tags {
......@@ -39,7 +39,7 @@ public final class Tags {
/**
* SPAN_LAYER represents the kind of span.
*
* <p>
* e.g.
* db=database;
* rpc=Remote Procedure Call Framework, like motan, thift;
......@@ -91,7 +91,7 @@ public final class Tags {
/**
* ERROR indicates whether a Span ended in an error state.
*/
public static final BooleanTag ERROR = new BooleanTag("error");
public static final BooleanTag ERROR = new BooleanTag("error", false);
/**
* PEER_HOST records host address (ip:port, or ip1:port1,ip2:port2) of the peer, maybe IPV4, IPV6 or hostname.
......
......@@ -22,11 +22,12 @@ message SegmentRefMessage {
message SpanMessage {
int32 spanId = 1;
int64 startTime = 2;
int64 endTime = 3;
string operationName = 4;
repeated KeyValue tags = 5;
repeated LogDataMessage logs = 6;
int32 parentSpanId = 2;
int64 startTime = 3;
int64 endTime = 4;
string operationName = 5;
repeated KeyValue tags = 6;
repeated LogDataMessage logs = 7;
}
message LogDataMessage {
......
......@@ -8,10 +8,10 @@ import org.junit.Test;
*/
public class ThrowableFormatterTest {
@Test
public void testFormat(){
public void testFormat() {
NullPointerException exception = new NullPointerException();
String formatLines = ThrowableFormatter.format(exception);
String[] lines = formatLines.split("\n");
String[] lines = formatLines.split(System.lineSeparator());
Assert.assertEquals("java.lang.NullPointerException", lines[0]);
Assert.assertEquals("\tat com.a.eye.skywalking.api.logging.ThrowableFormatterTest.testFormat(ThrowableFormatterTest.java:12)", lines[1]);
}
......
......@@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>skywalking-sniffer-mock</artifactId>
<description>This is a sniffer mock module, test for test-dependency, only.</description>
<description>This is a sniffer mock module. Simulate a sniffer, assemble one or more trace segments. Test-dependency, only.</description>
<dependencies>
<dependency>
......
......@@ -2,9 +2,11 @@ package com.a.eye.skywalking.sniffer.mock.trace;
import com.a.eye.skywalking.api.context.TracerContext;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.trace.builders.trace.DubboServerMysqlTraceBuilder;
import com.a.eye.skywalking.sniffer.mock.trace.builders.trace.SingleTomcat200TraceBuilder;
import com.a.eye.skywalking.sniffer.mock.trace.builders.trace.SingleTomcat404TraceBuilder;
import com.a.eye.skywalking.sniffer.mock.trace.builders.trace.SingleTomcat500TraceBuilder;
import com.a.eye.skywalking.sniffer.mock.trace.builders.trace.TomcatDubboClientTraceBuilder;
import com.a.eye.skywalking.trace.TraceSegment;
/**
......@@ -19,7 +21,6 @@ public enum TraceSegmentBuilderFactory {
/**
* @see {@link SingleTomcat200TraceBuilder}
*
* @return
*/
public TraceSegment singleTomcat200Trace(){
return this.build(SingleTomcat200TraceBuilder.INSTANCE);
......@@ -28,7 +29,6 @@ public enum TraceSegmentBuilderFactory {
/**
* @see {@link SingleTomcat404TraceBuilder}
*
* @return
*/
public TraceSegment singleTomcat404Trace(){
return this.build(SingleTomcat404TraceBuilder.INSTANCE);
......@@ -37,12 +37,25 @@ public enum TraceSegmentBuilderFactory {
/**
* @see {@link SingleTomcat500TraceBuilder}
*
* @return
*/
public TraceSegment singleTomcat500Trace(){
return this.build(SingleTomcat500TraceBuilder.INSTANCE);
}
/**
* @see {@link TomcatDubboClientTraceBuilder}
*/
public TraceSegment traceOf_Tomcat_DubboClient(){
return this.build(TomcatDubboClientTraceBuilder.INSTANCE);
}
/**
* @see {@link DubboServerMysqlTraceBuilder}
*/
public TraceSegment traceOf_DubboServer_MySQL() {
return this.build(DubboServerMysqlTraceBuilder.INSTANCE);
}
private TraceSegment build(TraceSegmentBuilder builder){
MockTracerContextListener listener = new MockTracerContextListener();
try{
......
package com.a.eye.skywalking.sniffer.mock.trace.builders.span;
import com.a.eye.skywalking.api.context.ContextManager;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.tag.Tags;
/**
* The <code>DubboSpanGenerator</code> generates all possible spans, by tracing Dubbo rpc.
* Including client/server side span.
*
* @author wusheng
*/
public class DubboSpanGenerator {
public static class Client extends SpanGeneration{
@Override protected void before() {
Span span = ContextManager.INSTANCE.createSpan("/default_rpc/com.a.eye.skywalking.test.persistence.PersistenceService.query");
Tags.COMPONENT.set(span, "Dubbo");
Tags.URL.set(span, "rest://192.168.1.8:20880/default_rpc/com.a.eye.skywalking.test.persistence.PersistenceService.query(String)");
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_SERVER);
Tags.PEER_HOST.set(span, "192.168.1.8");
Tags.PEER_PORT.set(span, 20880);
Tags.SPAN_LAYER.asHttp(span);
}
@Override protected void after() {
ContextManager.INSTANCE.stopSpan();
}
}
public static class Server extends SpanGeneration{
@Override protected void before() {
Span span = ContextManager.INSTANCE.createSpan("/default_rpc/com.a.eye.skywalking.test.persistence.PersistenceService.query");
Tags.COMPONENT.set(span, "Dubbo");
Tags.URL.set(span, "rest://192.168.1.8:20880/default_rpc/com.a.eye.skywalking.test.persistence.PersistenceService.query(String)");
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);
Tags.PEER_HOST.set(span, "10.21.9.35");
Tags.SPAN_LAYER.asHttp(span);
}
@Override protected void after() {
ContextManager.INSTANCE.stopSpan();
}
}
}
package com.a.eye.skywalking.sniffer.mock.trace.builders.span;
import com.a.eye.skywalking.api.context.ContextManager;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.tag.Tags;
/**
* The <code>MySQLGenerator</code> generates all possible spans, by tracing mysql client access.
*
* @author wusheng
*/
public class MySQLGenerator {
public static class Query extends SpanGeneration {
@Override protected void before() {
Span span = ContextManager.INSTANCE.createSpan("mysql/jdbi/statement/executeQuery");
Tags.COMPONENT.set(span, "Mysql");
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_CLIENT);
Tags.PEER_HOST.set(span, "10.5.34.18");
Tags.PEER_PORT.set(span, 30088);
Tags.DB_INSTANCE.set(span, "mysql-instance");
Tags.DB_STATEMENT.set(span, "select * from users where user_id = 1");
Tags.DB_TYPE.set(span, "sql");
Tags.SPAN_LAYER.asDB(span);
}
@Override protected void after() {
ContextManager.INSTANCE.stopSpan();
}
}
}
package com.a.eye.skywalking.sniffer.mock.trace.builders.span;
/**
* Created by wusheng on 2017/2/28.
* The <code>SpanGeneration</code> implementations can generate several kinds of spans.
*
* @author wusheng
*/
public abstract class SpanGeneration {
private SpanGeneration next;
private SpanGeneration[] next;
public SpanGeneration build(SpanGeneration next){
this.next = next;
this.next = new SpanGeneration[]{next};
return next;
}
public SpanGeneration build(){
return this;
public void build(SpanGeneration... next){
this.next = next;
}
protected abstract void before();
......@@ -22,7 +24,9 @@ public abstract class SpanGeneration {
public void generate(){
this.before();
if(next != null){
next.generate();
for (SpanGeneration generation : next) {
generation.generate();
}
}
this.after();
}
......
......@@ -15,9 +15,12 @@ public class TomcatSpanGenerator{
@Override protected void before() {
Span webSpan = ContextManager.INSTANCE.createSpan("/web/serviceA");
Tags.COMPONENT.set(webSpan, "tomcat");
Tags.COMPONENT.set(webSpan, "Tomcat");
Tags.URL.set(webSpan, "http://10.21.9.35/web/serviceA");
Tags.SPAN_KIND.set(webSpan, Tags.SPAN_KIND_SERVER);
Tags.PEER_HOST.set(webSpan, "10.21.9.35");
Tags.PEER_PORT.set(webSpan, 80);
Tags.SPAN_LAYER.asHttp(webSpan);
}
@Override protected void after() {
......@@ -32,9 +35,12 @@ public class TomcatSpanGenerator{
@Override protected void before() {
Span webSpan = ContextManager.INSTANCE.createSpan("/web/service/unknown");
Tags.COMPONENT.set(webSpan, "tomcat");
Tags.COMPONENT.set(webSpan, "Tomcat");
Tags.URL.set(webSpan, "http://10.21.9.35/web/unknown");
Tags.SPAN_KIND.set(webSpan, Tags.SPAN_KIND_SERVER);
Tags.PEER_HOST.set(webSpan, "10.21.9.35");
Tags.PEER_PORT.set(webSpan, 80);
Tags.SPAN_LAYER.asHttp(webSpan);
}
@Override protected void after() {
......@@ -49,9 +55,12 @@ public class TomcatSpanGenerator{
@Override protected void before() {
Span webSpan = ContextManager.INSTANCE.createSpan("/web/error/service");
Tags.COMPONENT.set(webSpan, "tomcat");
Tags.COMPONENT.set(webSpan, "Tomcat");
Tags.URL.set(webSpan, "http://10.21.9.35/web/error/service");
Tags.SPAN_KIND.set(webSpan, Tags.SPAN_KIND_SERVER);
Tags.PEER_HOST.set(webSpan, "10.21.9.35");
Tags.PEER_PORT.set(webSpan, 80);
Tags.SPAN_LAYER.asHttp(webSpan);
}
@Override protected void after() {
......
package com.a.eye.skywalking.sniffer.mock.trace.builders.trace;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilder;
import com.a.eye.skywalking.sniffer.mock.trace.builders.span.DubboSpanGenerator;
import com.a.eye.skywalking.sniffer.mock.trace.builders.span.MySQLGenerator;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author wusheng
*/
public enum DubboServerMysqlTraceBuilder implements TraceSegmentBuilder {
INSTANCE;
@Override public TraceSegment build(MockTracerContextListener listener) {
DubboSpanGenerator.Server rootSpan = new DubboSpanGenerator.Server();
rootSpan.build(new MySQLGenerator.Query());
rootSpan.generate();
return listener.getFinished(0);
}
}
......@@ -14,7 +14,7 @@ public enum SingleTomcat200TraceBuilder implements TraceSegmentBuilder {
INSTANCE;
@Override public TraceSegment build(MockTracerContextListener listener) {
TomcatSpanGenerator.ON200.INSTANCE.build().generate();
TomcatSpanGenerator.ON200.INSTANCE.generate();
return listener.getFinished(0);
}
}
......@@ -14,7 +14,7 @@ public enum SingleTomcat404TraceBuilder implements TraceSegmentBuilder {
INSTANCE;
@Override public TraceSegment build(MockTracerContextListener listener) {
TomcatSpanGenerator.ON404.INSTANCE.build().generate();
TomcatSpanGenerator.ON404.INSTANCE.generate();
return listener.getFinished(0);
}
}
......@@ -14,7 +14,7 @@ public enum SingleTomcat500TraceBuilder implements TraceSegmentBuilder {
INSTANCE;
@Override public TraceSegment build(MockTracerContextListener listener) {
TomcatSpanGenerator.ON500.INSTANCE.build().generate();
TomcatSpanGenerator.ON500.INSTANCE.generate();
return listener.getFinished(0);
}
}
package com.a.eye.skywalking.sniffer.mock.trace.builders.trace;
import com.a.eye.skywalking.sniffer.mock.context.MockTracerContextListener;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilder;
import com.a.eye.skywalking.sniffer.mock.trace.builders.span.DubboSpanGenerator;
import com.a.eye.skywalking.sniffer.mock.trace.builders.span.TomcatSpanGenerator;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* A Trace segment contains two spans with ChildOf relations,
* the parent is a Tomcat span,
* the child is a Dubbo client span.
*
* @author wusheng
*/
public enum TomcatDubboClientTraceBuilder implements TraceSegmentBuilder {
INSTANCE;
@Override public TraceSegment build(MockTracerContextListener listener) {
TomcatSpanGenerator.ON200 rootSpan = new TomcatSpanGenerator.ON200();
rootSpan.build(new DubboSpanGenerator.Client());
rootSpan.generate();
return listener.getFinished(0);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册