提交 b95de632 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #119 from wu-sheng/feature/3.0

Merge this preview milestone. 
Sky Walking
==========
<img src="docs/resources/skywalking.png" alt="Sky Walking logo" height="90px" align="right" />
<img src="https://sky-walking.github.io/page-resources/3.0/skywalking.png" alt="Sky Walking logo" height="90px" align="right" />
SkyWalking: Large-Scale Distributed Systems Tracing Infrastructure, also known Distributed Tracer.
......@@ -13,81 +13,46 @@ SkyWalking: Large-Scale Distributed Systems Tracing Infrastructure, also known D
[![OpenTracing-1.0 Badge](https://img.shields.io/badge/OpenTracing--1.0-enabled-blue.svg)](http://opentracing.io)
[![Release Version](https://img.shields.io/badge/stable-2.3--2017-brightgreen.svg)](https://github.com/wu-sheng/sky-walking/releases)
# News
* sky-walking v3.0 iteration begins... The top 2 important features are: [`Update the trace-structure`](https://github.com/wu-sheng/sky-walking/issues/83) and [`Analyze trace, and bring metric/analytic/cause up`](https://github.com/wu-sheng/sky-walking/issues/84)
* See feature codes at [branch feature/3.0](https://github.com/wu-sheng/sky-walking/tree/feature/3.0)
* The new UI release on [wu-sheng/sky-walking-ui](https://github.com/wu-sheng/sky-walking-ui)
# Abstract
* An open source Large-Scale Distributed Systems Tracing Infrastructure, also known a ditributed tracer.
* Based on [Google Dapper Paper: Dapper, a Large-Scale Distributed Systems Tracing Infrastructure](http://research.google.com/pubs/pub36356.html), read [Simplified Chinese Version](http://duanple.blog.163.com/blog/static/70971767201329113141336/)
* Support popular rpc frameworks, such as [dubbo](https://github.com/alibaba/dubbo), [dubbox](https://github.com/dangdangdotcom/dubbox), [motan](https://github.com/weibocom/motan) etc., trigger email-alert when application occurs unexpected exception.
* Auto-instrumentation mechenism, **no need to CHANGE any application source code**.
* Easy to deploy, **even in product mode** (since 2.0) . No need of Hadoop, HBase, or Cassandra Cluster.
* Pure Java server implementation. provide gRPC (since 2.0) and HTTP (since 2.1) cross-platform spans collecting service.
# Supported components
### web containers
* Tomcat 7
* Tomcat 8
### databases
* mysql
* oracle
* h2
* easily extend to support sybase, sqlserver, jtds, db2, informix
### rpc frameworks
* dubbo
* dubbox
* httpClient
* motan
### cache
* jedis
### opentracing supported frameworks
* motan
* hprose-java
# Contributors
* 吴晟 [wusheng](https://github.com/wu-sheng) &nbsp;&nbsp;wu.sheng@foxmail.com
* 张鑫 [zhangxin](https://github.com/ascrutae) &nbsp;&nbsp;
_Chinese Articles about sky-walking and distributed tracer_
<img src="docs/resources/toutiao.JPG" alt="Sky Walking TouTiao" height="280px" />
_If you are Chinese Developer, you can join QQ Group: 392443393, and **Tagged** Sky-Walking._
* Auto-instrumentation mechanism, **no need to CHANGE any application source code**. Supported [frameworks](https://github.com/wu-sheng/sky-walking/wiki/3.0-supported-list).
* Pure Java server implementation, with RESTful Web service.
* High performance streaming analysis.
* The UI released on [wu-sheng/sky-walking-ui](https://github.com/wu-sheng/sky-walking-ui)
___
<a href="https://github.com/wu-sheng/sky-walking">
<img src="docs/resources/oneapm-award.png" alt="OneAPM Open Source Achievement Award" height="110px" align="left" />
<img src="https://sky-walking.github.io/page-resources/3.0/oneapm-award.png" alt="OneAPM Open Source Achievement Award" height="110px" align="left" />
</a>
In October 2016, Sky Walking won `OneAPM Open Source Achievement Award`. The award appreciates sky walking for its "*contribution to popularization of APM technology*". <br/>
[OneAPM](http://www.oneapm.com/) will provide financial support for the project in 2016-2017.<br/><br/>
[OneAPM](http://www.oneapm.com/) provides financial support for the project in 2016-2017.<br/><br/>
Thanks all users of sky walking project.
___
# Contributors
_In chronological order_
* 吴晟 [@wu-sheng](https://github.com/wu-sheng)
* 张鑫 [@ascrutae](https://github.com/ascrutae)
* 谭真 [@mircoteam](https://github.com/mircoteam)
* 徐妍 [@TastySummer](https://github.com/TastySummer)
* 彭勇升 [@pengys5](https://github.com/pengys5)
* 戴文
# Quick View
* distributed trace
![追踪连路图1](docs/resources/callChain.png?1=1)
![追踪连路图2](docs/resources/callChainDetail.png?1=1)
# Screenshots
- Topological graph of application clusters.
<img src="https://sky-walking.github.io/page-resources/3.0/topological_graph.png"/>
![追踪连路图3](docs/resources/callChainLog.png?1=1)
- Trace query.
<img src="https://sky-walking.github.io/page-resources/3.0/trace_segment.png"/>
* alarm mail
- Span detail.
<img src="https://sky-walking.github.io/page-resources/3.0/span.png" />
![告警邮件](docs/resources/alarmMail.png?1=1)
# Document
* [WIKI](https://github.com/wu-sheng/sky-walking/wiki)
_Chat with us on gitter, in English. As a Chinese Developer, you can join QQ Group: 392443393, and **Tagged** Sky-Walking._
......@@ -24,6 +24,21 @@
<name>Zhang Xin</name>
<url>https://github.com/ascrutae</url>
</developer>
<developer>
<name>Tan Zhen</name>
<url>https://github.com/mircoteam</url>
</developer>
<developer>
<name>Xu Yan</name>
<url>https://github.com/TastySummer</url>
</developer>
<developer>
<name>Peng Yongsheng</name>
<url>https://github.com/pengys5</url>
</developer>
<developer>
<name>Dai Wen</name>
</developer>
</developers>
<modules>
......
......@@ -94,6 +94,9 @@ public class SkyWalkingSpanBuilder implements Tracer.SpanBuilder {
@Override
public Span start() {
if (startTime == 0){
startTime = System.currentTimeMillis();
}
return new SkyWalkingSpan(this.operationName, this.startTime, this.tags);
}
......
......@@ -17,6 +17,7 @@
<properties>
<akka.version>2.4.17</akka.version>
<log4j.version>2.8.1</log4j.version>
</properties>
<dependencies>
......@@ -26,9 +27,14 @@
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
<version>${project.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
......
......@@ -2,14 +2,15 @@ package com.a.eye.skywalking.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ServiceLoader;
......@@ -17,7 +18,8 @@ import java.util.ServiceLoader;
* @author pengys5
*/
public class CollectorSystem {
private ILog logger = LogManager.getLogger(CollectorSystem.class);
private Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);
private ClusterWorkerContext clusterContext;
public LookUp getClusterContext() {
......@@ -36,14 +38,13 @@ public class CollectorSystem {
}
private void createAkkaSystem() {
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes)).
withFallback(ConfigFactory.load("application.conf"));
ActorSystem akkaSystem = ActorSystem.create("ClusterSystem", config);
if (!StringUtil.isEmpty(ClusterConfig.Cluster.seed_nodes)) {
config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes));
}
ActorSystem akkaSystem = ActorSystem.create(Const.SystemName, config);
clusterContext = new ClusterWorkerContext(akkaSystem);
}
......@@ -66,6 +67,7 @@ public class CollectorSystem {
private void loadLocalProviders() throws UsedRoleNameException {
ServiceLoader<AbstractLocalWorkerProvider> clusterServiceLoader = ServiceLoader.load(AbstractLocalWorkerProvider.class);
for (AbstractLocalWorkerProvider provider : clusterServiceLoader) {
logger.info("loadLocalProviders provider name: %s", provider.getClass().getName());
provider.setClusterContext(clusterContext);
clusterContext.putProvider(provider);
}
......
......@@ -7,20 +7,27 @@ import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class AbstractClusterWorker extends AbstractWorker {
public AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final public void allocateJob(Object message) throws Exception {
onWork(message);
}
protected abstract void onWork(Object message) throws Exception;
static class WorkerWithAkka extends UntypedActor {
private static ILog logger = LogManager.getLogger(WorkerWithAkka.class);
private Logger logger = LogManager.getFormatterLogger(WorkerWithAkka.class);
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
......@@ -59,7 +66,7 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
ownerWorker.work(message);
ownerWorker.allocateJob(message);
}
}
......
......@@ -17,7 +17,7 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
T clusterWorker = (T) workerInstance(getClusterContext());
clusterWorker.preStart();
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role() + "_" + num);
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role().roleName() + "_" + num);
ClusterWorkerRef workerRef = new ClusterWorkerRef(actorRef, role());
getClusterContext().put(workerRef);
......
......@@ -14,6 +14,16 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
}
final public void allocateJob(Object request) throws Exception {
onWork(request);
}
protected abstract void onWork(Object request) throws Exception;
static class WorkerWithDisruptor implements EventHandler<MessageHolder> {
private RingBuffer<MessageHolder> ringBuffer;
......@@ -28,9 +38,10 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
try {
Object message = event.getMessage();
event.reset();
asyncWorker.work(message);
asyncWorker.allocateJob(message);
if (endOfBatch) {
asyncWorker.work(new EndOfBatchCommand());
asyncWorker.allocateJob(new EndOfBatchCommand());
}
} catch (Exception e) {
e.printStackTrace();
......
......@@ -8,9 +8,13 @@ public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
super(role, clusterContext, selfContext);
}
@Override
final public void work(Object message) throws Exception {
final public void allocateJob(Object request, Object response) throws Exception {
onWork(request, response);
}
public abstract Object onWork(Object message) throws Exception;
protected abstract void onWork(Object request, Object response) throws Exception;
@Override
public void preStart() throws ProviderNotFoundException {
}
}
......@@ -18,8 +18,6 @@ public abstract class AbstractWorker {
public abstract void preStart() throws ProviderNotFoundException;
public abstract void work(Object message) throws Exception;
final public LookUp getSelfContext() {
return selfContext;
}
......
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -11,7 +11,8 @@ import java.util.concurrent.ConcurrentHashMap;
* @author pengys5
*/
public class ClusterWorkerContext extends WorkerContext {
private ILog logger = LogManager.getLogger(ClusterWorkerContext.class);
private Logger logger = LogManager.getFormatterLogger(ClusterWorkerContext.class);
private final ActorSystem akkaSystem;
private Map<String, AbstractWorkerProvider> providers = new ConcurrentHashMap<>();
......
......@@ -14,6 +14,10 @@ public class LocalSyncWorkerRef extends WorkerRef {
@Override
public void tell(Object message) throws Exception {
localSyncWorker.work(message);
localSyncWorker.allocateJob(message, null);
}
public void ask(Object request, Object response) throws Exception {
localSyncWorker.allocateJob(request, response);
}
}
package com.a.eye.skywalking.collector.actor;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
......
package com.a.eye.skywalking.collector.actor;
/**
* @author pengys5
*/
public class Promise {
private boolean isTold = false;
private Object value;
protected void completed(Object value) {
this.value = value;
isTold = true;
}
public boolean isTold() {
return isTold;
}
}
......@@ -10,12 +10,20 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public abstract class WorkerContext implements Context {
private Map<String, List<WorkerRef>> roleWorkers = new ConcurrentHashMap<>();
private Map<String, List<WorkerRef>> roleWorkers;
public WorkerContext() {
this.roleWorkers = new ConcurrentHashMap<>();
}
private Map<String, List<WorkerRef>> getRoleWorkers() {
return this.roleWorkers;
}
@Override
final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
if (roleWorkers.containsKey(role.roleName())) {
WorkerRefs refs = new WorkerRefs(roleWorkers.get(role.roleName()), role.workerSelector());
if (getRoleWorkers().containsKey(role.roleName())) {
WorkerRefs refs = new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
return refs;
} else {
throw new WorkerNotFoundException("role=" + role.roleName() + ", no available worker.");
......@@ -24,14 +32,14 @@ public abstract class WorkerContext implements Context {
@Override
final public void put(WorkerRef workerRef) {
if (!roleWorkers.containsKey(workerRef.getRole().roleName())) {
roleWorkers.putIfAbsent(workerRef.getRole().roleName(), new ArrayList<WorkerRef>());
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<WorkerRef>());
}
roleWorkers.get(workerRef.getRole().roleName()).add(workerRef);
getRoleWorkers().get(workerRef.getRole().roleName()).add(workerRef);
}
@Override
final public void remove(WorkerRef workerRef) {
roleWorkers.remove(workerRef.getRole().roleName());
getRoleWorkers().remove(workerRef.getRole().roleName());
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import java.util.List;
/**
......@@ -11,7 +11,7 @@ import java.util.List;
*/
public class WorkerRefs<T extends WorkerRef> {
private static ILog logger = LogManager.getLogger(WorkerRefs.class);
private Logger logger = LogManager.getFormatterLogger(WorkerRefs.class);
private List<T> workerRefs;
private WorkerSelector workerSelector;
......@@ -25,4 +25,13 @@ public class WorkerRefs<T extends WorkerRef> {
logger.debug("WorkerSelector instance of %s", workerSelector.getClass());
workerSelector.select(workerRefs, message).tell(message);
}
public void ask(Object request, Object response) throws Exception {
WorkerRef workerRef = workerSelector.select(workerRefs, request);
if (workerRef instanceof LocalSyncWorkerRef) {
((LocalSyncWorkerRef) workerRef).ask(request, response);
} else {
throw new IllegalAccessError("only local sync worker can ask");
}
}
}
......@@ -6,7 +6,7 @@ package com.a.eye.skywalking.collector.cluster;
* {@link Cluster.Current#port} is a port of server use to bind
* {@link Cluster.Current#roles} is a roles of workers that use to create workers which
* has those role in this process.
* {@link Cluster#seed_nodes} is a seed_nodes which cluster have.
* {@link Cluster#seed_nodes} is a seed_nodes which cluster have, List of strings, e.g. seed_nodes = "ip:port,ip:port"..
*
* @author pengys5
*/
......@@ -19,6 +19,6 @@ public class ClusterConfig {
public static String roles = "";
}
public static String seed_nodes = "127.0.0.1:2551";
public static String seed_nodes = "";
}
}
......@@ -2,8 +2,9 @@ package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.api.util.ConfigInitializer;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.util.Properties;
......@@ -20,7 +21,7 @@ import java.util.Properties;
*/
public class ClusterConfigInitializer {
private static ILog logger = LogManager.getLogger(ClusterConfigInitializer.class);
private static Logger logger = LogManager.getFormatterLogger(ClusterConfigInitializer.class);
public static final String ConfigFileName = "collector.config";
......
package com.a.eye.skywalking.collector.cluster;
/**
* @author pengys5
*/
public class Const {
public static final String SystemName = "ClusterSystem";
}
......@@ -5,10 +5,10 @@ import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.ClusterWorkerRef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.Map;
......@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class WorkersListener extends UntypedActor {
private ILog logger = LogManager.getLogger(WorkersListener.class);
private Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
public static final String WorkName = "WorkersListener";
......@@ -52,7 +52,7 @@ public class WorkersListener extends UntypedActor {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
ActorRef sender = getSender();
// logger.info("register worker of role: %s, path: %s", register.getWorkRole(), sender.toString());
logger.info("register worker of role: %s, path: %s", register.getRole().roleName(), sender.toString());
ClusterWorkerRef workerRef = new ClusterWorkerRef(sender, register.getRole());
relation.put(sender, workerRef);
clusterContext.put(new ClusterWorkerRef(sender, register.getRole()));
......
package com.a.eye.skywalking.collector.actor;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
/**
* @author pengys5
*/
public class AbstractClusterWorkerTestCase {
@Test
public void testAllocateJob() throws Exception {
AbstractClusterWorker worker = PowerMockito.mock(AbstractClusterWorker.class);
String jobStr = "TestJob";
worker.allocateJob(jobStr);
Mockito.verify(worker).onWork(jobStr);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.RingBuffer;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
/**
* @author pengys5
*/
public class AbstractLocalAsyncWorkerTestCase {
@Test
public void testAllocateJob() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
String message = "Test";
worker.allocateJob(message);
verify(worker).onWork(message);
}
@Test
public void testOnEventWhenNotEnd() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
AbstractLocalAsyncWorker.WorkerWithDisruptor disruptor = new AbstractLocalAsyncWorker.WorkerWithDisruptor(null, worker);
MessageHolder holder = new MessageHolder();
String message = "Test";
holder.setMessage(message);
disruptor.onEvent(holder, 0, false);
verify(worker).onWork(message);
}
@Test
public void testOnEventWhenEnd() throws Exception {
AbstractLocalAsyncWorker worker = mock(AbstractLocalAsyncWorker.class);
AbstractLocalAsyncWorker.WorkerWithDisruptor disruptor = new AbstractLocalAsyncWorker.WorkerWithDisruptor(null, worker);
MessageHolder holder = new MessageHolder();
String message = "Test";
holder.setMessage(message);
disruptor.onEvent(holder, 0, true);
verify(worker, times(1)).onWork(message);
verify(worker, times(1)).onWork(argThat(new IsEndOfBatchCommandClass()));
}
class IsEndOfBatchCommandClass extends ArgumentMatcher<EndOfBatchCommand> {
public boolean matches(Object para) {
return para.getClass() == EndOfBatchCommand.class;
}
}
}
package com.a.eye.skywalking.collector.actor;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.powermock.api.mockito.PowerMockito.*;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({AbstractWorker.class})
public class AbstractWorkerProviderTestCase {
@Test(expected = IllegalArgumentException.class)
public void testNullWorkerInstanceCreate() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
when(provider.workerInstance(null)).thenReturn(null);
AbstractWorker worker = mock(AbstractWorker.class);
provider.create(worker);
}
@Test
public void testNoneWorkerOwner() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
provider.create(null);
Mockito.verify(provider).onCreate(null);
}
@Test
public void testHasWorkerOwner() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
AbstractWorker workerOwner = mock(AbstractWorker.class);
LocalWorkerContext localWorkerContext = mock(LocalWorkerContext.class);
when(workerOwner.getSelfContext()).thenReturn(localWorkerContext);
provider.create(workerOwner);
Mockito.verify(provider).onCreate(localWorkerContext);
}
@Test(expected = IllegalArgumentException.class)
public void testHasWorkerOwnerButNoneContext() throws ProviderNotFoundException {
AbstractWorkerProvider provider = mock(AbstractWorkerProvider.class);
ClusterWorkerContext context = mock(ClusterWorkerContext.class);
provider.setClusterContext(context);
AbstractWorker worker = mock(AbstractWorker.class);
when(provider.workerInstance(context)).thenReturn(worker);
AbstractWorker workerOwner = mock(AbstractWorker.class);
when(workerOwner.getSelfContext()).thenReturn(null);
provider.create(workerOwner);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestClusterWorker extends AbstractClusterWorker {
public TestClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(TestLocalSyncWorker.TestLocalSyncWorkerRole.INSTANCE).create(this);
getClusterContext().findProvider(TestLocalAsyncWorker.TestLocalASyncWorkerRole.INSTANCE).create(this);
}
@Override
public void work(Object message) throws Exception {
if (message.equals("Print")) {
System.out.println(message);
} else if (message.equals("TellLocalWorker")) {
System.out.println(message);
getSelfContext().lookup(TestLocalSyncWorker.TestLocalSyncWorkerRole.INSTANCE).tell(message);
} else if (message.equals("TellLocalAsyncWorker")) {
System.out.println(message);
getSelfContext().lookup(TestLocalAsyncWorker.TestLocalASyncWorkerRole.INSTANCE).tell(message);
} else {
System.out.println("unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<TestClusterWorker> {
@Override
public int workerNum() {
return 5;
}
@Override
public Role role() {
return TestClusterWorkerRole.INSTANCE;
}
@Override
public TestClusterWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestClusterWorker(role(), clusterContext, new LocalWorkerContext());
}
}
public enum TestClusterWorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestClusterWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.CollectorSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class TestClusterWorkerTestCase {
private CollectorSystem collectorSystem;
// @Before
public void createSystem() throws Exception {
collectorSystem = new CollectorSystem();
collectorSystem.boot();
}
// @Before
public void terminateSystem() {
collectorSystem.terminate();
}
// @Test
public void testTellWorker() throws Exception {
WorkerRefs workerRefs = collectorSystem.getClusterContext().lookup(TestClusterWorker.TestClusterWorkerRole.INSTANCE);
workerRefs.tell("Print");
workerRefs.tell("TellLocalWorker");
workerRefs.tell("TellLocalAsyncWorker");
Thread.sleep(5000);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestLocalAsyncWorker extends AbstractLocalAsyncWorker {
public TestLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
}
@Override
public void work(Object message) throws Exception {
if (message.equals("TellLocalAsyncWorker")) {
System.out.println("hello async!");
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<TestLocalAsyncWorker> {
@Override
public int queueSize() {
return 1024;
}
@Override
public Role role() {
return TestLocalASyncWorkerRole.INSTANCE;
}
@Override
public TestLocalAsyncWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestLocalAsyncWorker(role(), clusterContext, new LocalWorkerContext());
}
}
public enum TestLocalASyncWorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestLocalAsyncWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestLocalSyncWorker extends AbstractLocalSyncWorker {
public TestLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
}
@Override
public Object onWork(Object message) throws Exception {
if (message.equals("TellLocalWorker")) {
System.out.println("hello! ");
} else {
System.out.println("unhandled");
}
return "Hello";
}
public static class Factory extends AbstractLocalSyncWorkerProvider<TestLocalSyncWorker> {
@Override
public Role role() {
return TestLocalSyncWorkerRole.INSTANCE;
}
@Override
public TestLocalSyncWorker workerInstance(ClusterWorkerContext clusterContext) {
return new TestLocalSyncWorker(role(), clusterContext, new LocalWorkerContext());
}
}
public enum TestLocalSyncWorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return TestLocalSyncWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.CollectorSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class TestLocalSyncWorkerTestCase {
@Before
public void createSystem() throws Exception {
}
@After
public void terminateSystem() {
}
@Test
public void testTellWorker() throws Exception {
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
/**
* @author pengys5
*/
public class HashCodeSelectorTestCase {
@Test
public void testSelect() {
List<WorkerRef> members = new ArrayList<>();
WorkerRef workerRef_1 = mock(WorkerRef.class);
WorkerRef workerRef_2 = mock(WorkerRef.class);
WorkerRef workerRef_3 = mock(WorkerRef.class);
members.add(workerRef_1);
members.add(workerRef_2);
members.add(workerRef_3);
AbstractHashMessage message_1 = mock(AbstractHashMessage.class);
when(message_1.getHashCode()).thenReturn(9);
AbstractHashMessage message_2 = mock(AbstractHashMessage.class);
when(message_2.getHashCode()).thenReturn(10);
AbstractHashMessage message_3 = mock(AbstractHashMessage.class);
when(message_3.getHashCode()).thenReturn(11);
HashCodeSelector selector = new HashCodeSelector();
WorkerRef select_1 = selector.select(members, message_1);
Assert.assertEquals(workerRef_1.hashCode(), select_1.hashCode());
WorkerRef select_2 = selector.select(members, message_2);
Assert.assertEquals(workerRef_2.hashCode(), select_2.hashCode());
WorkerRef select_3 = selector.select(members, message_3);
Assert.assertEquals(workerRef_3.hashCode(), select_3.hashCode());
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.powermock.api.mockito.PowerMockito.*;
/**
* @author pengys5
*/
public class RollingSelectorTestCase {
@Test
public void testSelect() {
List<WorkerRef> members = new ArrayList<>();
WorkerRef workerRef_1 = mock(WorkerRef.class);
WorkerRef workerRef_2 = mock(WorkerRef.class);
WorkerRef workerRef_3 = mock(WorkerRef.class);
members.add(workerRef_1);
members.add(workerRef_2);
members.add(workerRef_3);
Object message = new Object();
RollingSelector selector = new RollingSelector();
WorkerRef selected_1 = selector.select(members, message);
Assert.assertEquals(workerRef_2.hashCode(), selected_1.hashCode());
WorkerRef selected_2 = selector.select(members, message);
Assert.assertEquals(workerRef_3.hashCode(), selected_2.hashCode());
WorkerRef selected_3 = selector.select(members, message);
Assert.assertEquals(workerRef_1.hashCode(), selected_3.hashCode());
}
}
package com.a.eye.skywalking.collector.queue;
import org.junit.Test;
/**
* @author pengys5
*/
public class QueueTestCase {
@Test
public void testProducer() throws InterruptedException {
}
}
package com.a.eye.skywalking.collector.commons.config;
/**
* @author pengys5
*/
public enum SeedNodesFormatter {
INSTANCE;
public String formatter(String seedNodes) {
return null;
}
}
......@@ -5,14 +5,14 @@ akka {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
TraceSegment = "com.a.eye.skywalking.collector.worker.TraceSegmentSerializer"
// TraceSegment = "com.a.eye.skywalking.collector.worker.TraceSegmentSerializer"
json = "com.a.eye.skywalking.collector.commons.serializer.JsonSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
"com.a.eye.skywalking.trace.TraceSegment" = TraceSegment
// "com.a.eye.skywalking.trace.TraceSegment" = TraceSegment
"com.google.gson.JsonObject" = json
}
......@@ -29,5 +29,6 @@ akka {
cluster {
auto-down-unreachable-after = off
metrics.enabled = off
roles = ["WorkersListener"]
}
}
......@@ -12,6 +12,10 @@
<artifactId>skywalking-collector-worker</artifactId>
<packaging>jar</packaging>
<properties>
<jetty.version>9.4.2.v20170220</jetty.version>
</properties>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
......@@ -20,18 +24,18 @@
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-log4j2</artifactId>
<artifactId>skywalking-collector-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-collector-commons</artifactId>
<version>${project.version}</version>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.nanohttpd</groupId>
<artifactId>nanohttpd</artifactId>
<version>2.3.1</version>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
......
......@@ -20,15 +20,19 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
@Override
public void preStart() throws ProviderNotFoundException {
}
@Override
public void work(Object message) throws Exception {
final public void onWork(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
analyse(message);
try {
analyse(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
......
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.CollectorSystem;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.logging.log4j2.Log4j2Resolver;
import com.a.eye.skywalking.collector.worker.httpserver.HttpServer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.IndexCreator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class CollectorBootStartUp {
/**
* TODO pengys5, make the exception clear.
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
LogManager.setLogResolver(new Log4j2Resolver());
private static Logger logger = LogManager.getFormatterLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws Exception {
logger.info("collector system starting....");
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes)).
withFallback(ConfigFactory.load("application.conf"));
// ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
// WorkersCreator.INSTANCE.boot(system);
HttpServer.INSTANCE.boot();
// EsClient.boot();
CollectorSystem collectorSystem = new CollectorSystem();
collectorSystem.boot();
EsClient.boot();
// IndexCreator.INSTANCE.create();
HttpServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
}
}
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public class Const {
public static final String ID_SPLIT = "..-..";
public static final String IDS_SPLIT = "\\.\\.-\\.\\.";
public static final String PEERS_FRONT_SPLIT = "[";
public static final String PEERS_BEHIND_SPLIT = "]";
public static final String USER_CODE = "User";
public static final String RESULT = "result";
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
/**
* @author pengys5
*/
public abstract class MergeAnalysisMember extends AnalysisMember {
private MergePersistenceData persistenceData;
protected MergeAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
persistenceData = new MergePersistenceData();
}
private MergePersistenceData getPersistenceData() {
return persistenceData;
}
final protected void setMergeData(String id, String column, String value) throws Exception {
getPersistenceData().getElseCreate(id).setMergeData(column, value);
if (getPersistenceData().size() >= WorkerConfig.Persistence.Data.size) {
aggregation();
}
}
final public MergeData pushOne() {
if (getPersistenceData().iterator().hasNext()) {
return getPersistenceData().pushOne();
}
return null;
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.Client;
import java.util.Iterator;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class MergePersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(MergePersistenceMember.class);
private MergePersistenceData persistenceData = new MergePersistenceData();
protected MergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
final public void analyse(Object message) throws Exception {
if (message instanceof MergeData) {
MergeData mergeData = (MergeData) message;
persistenceData.getElseCreate(mergeData.getId()).merge(mergeData);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
} else {
logger.error("message unhandled");
}
}
final protected void persistence() {
MultiGetResponse multiGetResponse = searchFromEs();
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
if (response != null && response.isExists()) {
persistenceData.getElseCreate(response.getId()).merge(response.getSource());
}
}
boolean success = saveToEs();
if (success) {
persistenceData.clear();
}
}
private MultiGetResponse searchFromEs() {
Client client = EsClient.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
Iterator<Map.Entry<String, MergeData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
multiGetRequestBuilder.add(esIndex(), esType(), iterator.next().getKey());
}
return multiGetRequestBuilder.get();
}
private boolean saveToEs() {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
Iterator<Map.Entry<String, MergeData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
MergeData mergeData = iterator.next().getValue();
bulkRequest.add(client.prepareIndex(esIndex(), esType(), mergeData.getId()).setSource(mergeData.toMap()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
return !bulkResponse.hasFailures();
}
}
package com.a.eye.skywalking.collector.worker;
import akka.actor.ActorRef;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import com.a.eye.skywalking.collector.worker.storage.MetricPersistenceData;
import com.lmax.disruptor.RingBuffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public abstract class MetricAnalysisMember extends AnalysisMember {
private Logger logger = LogManager.getFormatterLogger(MetricAnalysisMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
private MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
public void setMetric(String id, int second, Long value) throws Exception {
persistenceData.getElseCreate(id).setMetric(second, value);
final protected void setMetric(String id, String column, Long value) throws Exception {
persistenceData.getElseCreate(id).setMetric(column, value);
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
aggregation();
}
}
public MetricData pushOne() {
final public MetricData pushOne() {
if (persistenceData.iterator().hasNext()) {
return persistenceData.pushOne();
}
......
......@@ -26,14 +26,14 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(MetricPersistenceMember.class);
protected MetricPersistenceData persistenceData = new MetricPersistenceData();
private MetricPersistenceData persistenceData = new MetricPersistenceData();
public MetricPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
final public void analyse(Object message) throws Exception {
if (message instanceof MetricData) {
MetricData metricData = (MetricData) message;
persistenceData.getElseCreate(metricData.getId()).merge(metricData);
......@@ -45,7 +45,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
}
}
protected void persistence() {
final protected void persistence() {
MultiGetResponse multiGetResponse = searchFromEs();
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
......@@ -60,11 +60,12 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
}
}
public MultiGetResponse searchFromEs() {
private MultiGetResponse searchFromEs() {
Client client = EsClient.getClient();
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
Iterator<Map.Entry<String, MetricData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
multiGetRequestBuilder.add(esIndex(), esType(), iterator.next().getKey());
}
......@@ -73,7 +74,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
return multiGetResponse;
}
public boolean saveToEs() {
private boolean saveToEs() {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
......
......@@ -23,12 +23,12 @@ public abstract class PersistenceMember extends AbstractLocalAsyncWorker {
public abstract void analyse(Object message) throws Exception;
@Override
public void preStart() throws ProviderNotFoundException {
final public void preStart() throws ProviderNotFoundException {
}
@Override
public void work(Object message) throws Exception {
final protected void onWork(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
persistence();
} else {
......
......@@ -25,14 +25,14 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
super(role, clusterContext, selfContext);
}
public void setRecord(String id, JsonObject record) throws Exception {
final public void setRecord(String id, JsonObject record) throws Exception {
persistenceData.getElseCreate(id).setRecord(record);
if (persistenceData.size() >= WorkerConfig.Analysis.Data.size) {
aggregation();
}
}
public RecordData pushOne() {
final public RecordData pushOne() {
if (persistenceData.hasNext()) {
return persistenceData.pushOne();
}
......
......@@ -22,18 +22,24 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
private Logger logger = LogManager.getFormatterLogger(RecordPersistenceMember.class);
protected RecordPersistenceData persistenceData = new RecordPersistenceData();
private RecordPersistenceData persistenceData;
public RecordPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
persistenceData = new RecordPersistenceData();
}
private RecordPersistenceData getPersistenceData() {
return this.persistenceData;
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof RecordData) {
RecordData recordData = (RecordData) message;
persistenceData.getElseCreate(recordData.getId()).setRecord(recordData.getRecord());
if (persistenceData.size() >= WorkerConfig.Persistence.Data.size) {
logger.debug("setRecord: id: %s, data: %s", recordData.getId(), recordData.getRecord());
getPersistenceData().getElseCreate(recordData.getId()).setRecord(recordData.getRecord());
if (getPersistenceData().size() >= WorkerConfig.Persistence.Data.size) {
persistence();
}
} else {
......@@ -44,23 +50,27 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
protected void persistence() {
boolean success = saveToEs();
if (success) {
persistenceData.clear();
getPersistenceData().clear();
}
}
public boolean saveToEs() {
private boolean saveToEs() {
Client client = EsClient.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
logger.debug("persistenceData size: %s", getPersistenceData().size());
Iterator<Map.Entry<String, RecordData>> iterator = persistenceData.iterator();
Iterator<Map.Entry<String, RecordData>> iterator = getPersistenceData().iterator();
while (iterator.hasNext()) {
Map.Entry<String, RecordData> recordData = iterator.next();
logger.debug("saveToEs: key: %s, data: %s", recordData.getKey(), recordData.getValue().getRecord().toString());
bulkRequest.add(client.prepareIndex(esIndex(), esType(), recordData.getKey()).setSource(recordData.getValue().getRecord().toString()));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
logger.error(bulkResponse.buildFailureMessage());
}
return !bulkResponse.hasFailures();
}
}
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public abstract class TimeSlice {
private String sliceType;
private long startTime;
private long endTime;
public TimeSlice(String sliceType,long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
this.sliceType = sliceType;
}
public String getSliceType() {
return sliceType;
}
public long getStartTime() {
return startTime;
}
public long getEndTime() {
return endTime;
}
}
......@@ -45,7 +45,88 @@ public class WorkerConfig extends ClusterConfig {
}
}
public static class WorkerNum {
public static class Node {
public static class NodeDayAgg {
public static int Value = 10;
}
public static class NodeHourAgg {
public static int Value = 10;
}
public static class NodeMinuteAgg {
public static int Value = 10;
}
public static class NodeMappingDayAgg {
public static int Value = 10;
}
public static class NodeMappingHourAgg {
public static int Value = 10;
}
public static class NodeMappingMinuteAgg {
public static int Value = 10;
}
}
}
public static class Queue {
public static class Segment {
public static class SegmentCostSave {
public static int Size = 1024;
}
public static class SegmentSave {
public static int Size = 1024;
}
public static class SegmentExceptionSave {
public static int Size = 1024;
}
}
public static class Node {
public static class NodeDayAnalysis {
public static int Size = 1024;
}
public static class NodeHourAnalysis {
public static int Size = 1024;
}
public static class NodeMinuteAnalysis {
public static int Size = 1024;
}
public static class NodeRefDayAnalysis {
public static int Size = 1024;
}
public static class NodeRefHourAnalysis {
public static int Size = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int Size = 1024;
}
public static class NodeMappingDayAnalysis {
public static int Size = 1024;
}
public static class NodeMappingHourAnalysis {
public static int Size = 1024;
}
public static class NodeMappingMinuteAnalysis {
public static int Size = 1024;
}
}
public static class Persistence {
public static class DAGNodePersistence {
public static int Size = 1024;
......@@ -92,6 +173,5 @@ public class WorkerConfig extends ClusterConfig {
public static class DAGNodeRefAnalysis {
public static int Size = 1024;
}
}
}
package com.a.eye.skywalking.collector.worker.application;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.application.analysis.DAGNodeAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.NodeInstanceAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseCostAnalysis;
import com.a.eye.skywalking.collector.worker.application.analysis.ResponseSummaryAnalysis;
import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.a.eye.skywalking.trace.tag.Tags;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ApplicationMain extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(ApplicationMain.class);
public ApplicationMain(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(DAGNodeAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(NodeInstanceAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(ResponseCostAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(ResponseSummaryAnalysis.Role.INSTANCE).create(this);
getClusterContext().findProvider(TraceSegmentRecordPersistence.Role.INSTANCE).create(this);
}
@Override
public Object onWork(Object message) throws Exception {
if (message instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
logger.debug("begin translate TraceSegment Object to JsonObject");
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
getSelfContext().lookup(TraceSegmentRecordPersistence.Role.INSTANCE).tell(traceSegment);
sendToDAGNodePersistence(traceSegment);
sendToNodeInstanceAnalysis(traceSegment);
sendToResponseCostPersistence(traceSegment);
sendToResponseSummaryPersistence(traceSegment);
}
return null;
}
public static class Factory extends AbstractLocalSyncWorkerProvider<ApplicationMain> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return null;
}
@Override
public ApplicationMain workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationMain(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ApplicationMain.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
private void sendToDAGNodePersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
String component = null;
String layer = null;
for (Span span : traceSegment.getTraceSegment().getSpans()) {
if (span.getParentSpanId() == -1) {
component = Tags.COMPONENT.get(span);
layer = Tags.SPAN_LAYER.get(span);
}
}
DAGNodeAnalysis.Metric node = new DAGNodeAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, component, layer);
getSelfContext().lookup(DAGNodeAnalysis.Role.INSTANCE).tell(node);
}
private void sendToNodeInstanceAnalysis(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
TraceSegment segment = traceSegment.getTraceSegment();
List<TraceSegmentRef> refs = segment.getRefs();
if (refs != null) {
for (TraceSegmentRef ref : refs) {
String code = segment.getApplicationCode();
String address = ref.getPeerHost();
NodeInstanceAnalysis.Metric property = new NodeInstanceAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, address);
getSelfContext().lookup(NodeInstanceAnalysis.Role.INSTANCE).tell(property);
}
}
}
private void sendToResponseCostPersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
long startTime = -1;
long endTime = -1;
Boolean isError = false;
for (Span span : traceSegment.getTraceSegment().getSpans()) {
if (span.getParentSpanId() == -1) {
startTime = span.getStartTime();
endTime = span.getEndTime();
isError = Tags.ERROR.get(span);
}
}
ResponseCostAnalysis.Metric cost = new ResponseCostAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError, startTime, endTime);
getSelfContext().lookup(ResponseCostAnalysis.Role.INSTANCE).tell(cost);
}
private void sendToResponseSummaryPersistence(TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment) throws Exception {
String code = traceSegment.getTraceSegment().getApplicationCode();
boolean isError = false;
for (Span span : traceSegment.getTraceSegment().getSpans()) {
if (span.getParentSpanId() == -1) {
isError = Tags.ERROR.get(span);
}
}
ResponseSummaryAnalysis.Metric summary = new ResponseSummaryAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), code, isError);
getSelfContext().lookup(ResponseSummaryAnalysis.Role.INSTANCE).tell(summary);
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseCostReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class ResponseCostAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseCostAnalysis.class);
public ResponseCostAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
long cost = metric.endTime - metric.startTime;
if (cost <= 1000 && !metric.isError) {
String id = metric.getMinute() + "-" + metric.code;
setMetric(id, metric.getSecond(), cost);
}
// logger.debug("response cost metric: %s", data.toString());
}
}
@Override
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
getClusterContext().lookup(ResponseCostReceiver.Role.INSTANCE).tell(oneMetric);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseCostAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public ResponseCostAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseCostAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseCostAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ResponseCostAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
private final Long startTime;
private final Long endTime;
public Metric(long minute, int second, String code, Boolean isError, Long startTime, Long endTime) {
super(minute, second);
this.code = code;
this.isError = isError;
this.startTime = startTime;
this.endTime = endTime;
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class TraceSegmentRecordPersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentRecordPersistence.class);
@Override
public String esIndex() {
return "application_record";
}
@Override
public String esType() {
return "trace_segment";
}
public TraceSegmentRecordPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
JsonObject jsonObject = parseTraceSegment(traceSegment.getTraceSegment(), traceSegment.getMinute());
RecordData recordData = new RecordData(traceSegment.getTraceSegment().getTraceSegmentId());
recordData.setRecord(jsonObject);
super.analyse(recordData);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<TraceSegmentRecordPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
}
@Override
public TraceSegmentRecordPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new TraceSegmentRecordPersistence(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return TraceSegmentRecordPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
private JsonObject parseTraceSegment(TraceSegment traceSegment, long minute) {
JsonObject traceJsonObj = new JsonObject();
traceJsonObj.addProperty("segmentId", traceSegment.getTraceSegmentId());
traceJsonObj.addProperty(DateTools.Time_Slice_Column_Name, minute);
traceJsonObj.addProperty("startTime", traceSegment.getStartTime());
traceJsonObj.addProperty("endTime", traceSegment.getEndTime());
traceJsonObj.addProperty("appCode", traceSegment.getApplicationCode());
if (traceSegment.getRefs() != null) {
JsonArray refsJsonArray = parseRefs(traceSegment.getRefs());
traceJsonObj.add("refs", refsJsonArray);
}
JsonArray spanJsonArray = new JsonArray();
for (Span span : traceSegment.getSpans()) {
JsonObject spanJsonObj = parseSpan(span);
spanJsonArray.add(spanJsonObj);
}
traceJsonObj.add("spans", spanJsonArray);
return traceJsonObj;
}
private JsonArray parseRefs(List<TraceSegmentRef> refs) {
JsonArray refsJsonArray = new JsonArray();
for (TraceSegmentRef ref : refs) {
JsonObject refJsonObj = new JsonObject();
refJsonObj.addProperty("spanId", ref.getSpanId());
refJsonObj.addProperty("appCode", ref.getApplicationCode());
refJsonObj.addProperty("segmentId", ref.getTraceSegmentId());
refJsonObj.addProperty("peerHost", ref.getPeerHost());
refsJsonArray.add(refJsonObj);
}
return refsJsonArray;
}
private JsonObject parseSpan(Span span) {
JsonObject spanJsonObj = new JsonObject();
spanJsonObj.addProperty("spanId", span.getSpanId());
spanJsonObj.addProperty("parentSpanId", span.getParentSpanId());
spanJsonObj.addProperty("startTime", span.getStartTime());
spanJsonObj.addProperty("endTime", span.getEndTime());
spanJsonObj.addProperty("operationName", span.getOperationName());
JsonObject tagsJsonObj = parseSpanTag(span.getTags());
spanJsonObj.add("tags", tagsJsonObj);
return spanJsonObj;
}
private JsonObject parseSpanTag(Map<String, Object> tags) {
JsonObject tagsJsonObj = new JsonObject();
for (Map.Entry<String, Object> entry : tags.entrySet()) {
String key = entry.getKey();
String value = String.valueOf(entry.getValue());
tagsJsonObj.addProperty(key, value);
}
return tagsJsonObj;
}
}
package com.a.eye.skywalking.collector.worker.applicationref;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.applicationref.analysis.DAGNodeRefAnalysis;
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import java.util.List;
/**
* @author pengys5
*/
public class ApplicationRefMain extends AbstractLocalSyncWorker {
private DAGNodeRefAnalysis dagNodeRefAnalysis;
public ApplicationRefMain(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(DAGNodeRefAnalysis.Role.INSTANCE).create(this);
}
@Override
public Object onWork(Object message) throws Exception {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
TraceSegment segment = traceSegment.getTraceSegment();
List<TraceSegmentRef> refs = segment.getRefs();
if(refs != null){
for (TraceSegmentRef ref : refs) {
String front = ref.getApplicationCode();
String behind = segment.getApplicationCode();
DAGNodeRefAnalysis.Metric nodeRef = new DAGNodeRefAnalysis.Metric(traceSegment.getMinute(), traceSegment.getSecond(), front, behind);
getSelfContext().lookup(DAGNodeRefAnalysis.Role.INSTANCE).tell(nodeRef);
}
}
return null;
}
public static class Factory extends AbstractLocalSyncWorkerProvider<ApplicationRefMain> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public ApplicationRefMain workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRefMain(role(), clusterContext, new LocalWorkerContext());
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return ApplicationRefMain.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.globaltrace;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceSearchWithGlobalId;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGet;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractGetProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class GlobalTraceGetWithGlobalId extends AbstractGet {
private Logger logger = LogManager.getFormatterLogger(GlobalTraceGetWithGlobalId.class);
private GlobalTraceGetWithGlobalId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("globalId")) {
throw new IllegalArgumentException("the request parameter must contains globalId");
}
logger.debug("globalId: %s", Arrays.toString(request.get("globalId")));
String globalId = ParameterTools.INSTANCE.toString(request, "globalId");
getSelfContext().lookup(GlobalTraceSearchWithGlobalId.WorkerRole.INSTANCE).ask(globalId, response);
}
public static class Factory extends AbstractGetProvider<GlobalTraceGetWithGlobalId> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public GlobalTraceGetWithGlobalId workerInstance(ClusterWorkerContext clusterContext) {
return new GlobalTraceGetWithGlobalId(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/globalTrace/globalId";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return GlobalTraceGetWithGlobalId.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.globaltrace;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
/**
* @author pengys5
*/
public class GlobalTraceIndex extends AbstractIndex {
public static final String Index = "global_trace_idx";
public static final String SubSegIds = "subSegIds";
@Override
public String index() {
return Index;
}
@Override
public boolean isRecord() {
return true;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(SubSegIds)
.field("type", "text")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
package com.a.eye.skywalking.collector.worker.application.analysis;
package com.a.eye.skywalking.collector.worker.globaltrace.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MetricAnalysisMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.receiver.ResponseSummaryReceiver;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.storage.MetricData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.MergeAnalysisMember;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.TraceId.DistributedTraceId;
import com.a.eye.skywalking.trace.TraceSegment;
import java.util.List;
/**
* @author pengys5
*/
public class ResponseSummaryAnalysis extends MetricAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(ResponseSummaryAnalysis.class);
public class GlobalTraceAnalysis extends MergeAnalysisMember {
public ResponseSummaryAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private GlobalTraceAnalysis(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof Metric) {
Metric metric = (Metric) message;
String id = metric.getMinute() + "-" + metric.code;
setMetric(id, metric.getSecond(), 1L);
// logger.debug("response summary metric: %s", data.toString());
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
String subSegmentId = segment.getTraceSegmentId();
List<DistributedTraceId> globalTraceIdList = segment.getRelatedGlobalTraces();
if (CollectionTools.isNotEmpty(globalTraceIdList)) {
for (DistributedTraceId disTraceId : globalTraceIdList) {
String traceId = disTraceId.get();
setMergeData(traceId, GlobalTraceIndex.SubSegIds, subSegmentId);
}
}
}
}
@Override
protected void aggregation() throws Exception {
MetricData oneMetric;
while ((oneMetric = pushOne()) != null) {
getClusterContext().lookup(ResponseSummaryReceiver.Role.INSTANCE).tell(oneMetric);
MergeData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(GlobalTraceAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ResponseSummaryAnalysis> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTraceAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -51,13 +58,13 @@ public class ResponseSummaryAnalysis extends MetricAnalysisMember {
}
@Override
public ResponseSummaryAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new ResponseSummaryAnalysis(role(), clusterContext, new LocalWorkerContext());
public GlobalTraceAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new GlobalTraceAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.ResponseSummaryAnalysis.Size;
return 1024;
}
}
......@@ -66,23 +73,12 @@ public class ResponseSummaryAnalysis extends MetricAnalysisMember {
@Override
public String roleName() {
return ResponseSummaryAnalysis.class.getSimpleName();
return GlobalTraceAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
public static class Metric extends AbstractTimeSlice {
private final String code;
private final Boolean isError;
public Metric(long minute, int second, String code, Boolean isError) {
super(minute, second);
this.code = code;
this.isError = isError;
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.globaltrace.entity;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public class TreeNode {
private String spanId;
private List<TreeNode> childNodes;
public TreeNode(String spanId) {
this.spanId = spanId;
childNodes = new ArrayList<>();
}
public void addChild(TreeNode childNode) {
childNodes.add(childNode);
}
}
package com.a.eye.skywalking.collector.worker.application.receiver;
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.application.persistence.DAGNodePersistence;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class DAGNodeReceiver extends AbstractClusterWorker {
public class GlobalTraceAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(DAGNodeReceiver.class);
private Logger logger = LogManager.getFormatterLogger(GlobalTraceAgg.class);
public DAGNodeReceiver(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private GlobalTraceAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(DAGNodePersistence.Role.INSTANCE).create(this);
getClusterContext().findProvider(GlobalTraceSave.Role.INSTANCE).create(this);
}
@Override
public void work(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(DAGNodePersistence.Role.INSTANCE).tell(message);
protected void onWork(Object message) throws Exception {
if (message instanceof MergeData) {
getSelfContext().lookup(GlobalTraceSave.Role.INSTANCE).tell(message);
} else {
logger.error("message unhandled");
}
}
public static class Factory extends AbstractClusterWorkerProvider<DAGNodeReceiver> {
public static class Factory extends AbstractClusterWorkerProvider<GlobalTraceAgg> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -43,8 +42,8 @@ public class DAGNodeReceiver extends AbstractClusterWorker {
}
@Override
public DAGNodeReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeReceiver(role(), clusterContext, new LocalWorkerContext());
public GlobalTraceAgg workerInstance(ClusterWorkerContext clusterContext) {
return new GlobalTraceAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -58,12 +57,12 @@ public class DAGNodeReceiver extends AbstractClusterWorker {
@Override
public String roleName() {
return DAGNodeReceiver.class.getSimpleName();
return GlobalTraceAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.MergePersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
/**
* @author pengys5
*/
public class NodeInstancePersistence extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(NodeInstancePersistence.class);
public class GlobalTraceSave extends MergePersistenceMember {
public NodeInstancePersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
private GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "application";
return GlobalTraceIndex.Index;
}
@Override
public String esType() {
return "node_instance";
return GlobalTraceIndex.Type_Record;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeInstancePersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTraceSave> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -40,13 +38,13 @@ public class NodeInstancePersistence extends RecordPersistenceMember {
}
@Override
public NodeInstancePersistence workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstancePersistence(role(), clusterContext, new LocalWorkerContext());
public int queueSize() {
return WorkerConfig.Queue.TraceSegmentRecordAnalysis.Size;
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.NodeInstancePersistence.Size;
public GlobalTraceSave workerInstance(ClusterWorkerContext clusterContext) {
return new GlobalTraceSave(role(), clusterContext, new LocalWorkerContext());
}
}
......@@ -55,7 +53,7 @@ public class NodeInstancePersistence extends RecordPersistenceMember {
@Override
public String roleName() {
return NodeInstancePersistence.class.getSimpleName();
return GlobalTraceSave.class.getSimpleName();
}
@Override
......
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.logic.Segment;
import com.a.eye.skywalking.collector.worker.segment.logic.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.segment.logic.SpanView;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* @author pengys5
*/
public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(this.getClass());
private Gson gson = new Gson();
public GlobalTraceSearchWithGlobalId(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof String) {
Client client = EsClient.getClient();
String globalId = (String) request;
String globalTraceData = client.prepareGet(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, globalId).get().getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SubSegIds).getAsString();
String[] subSegIds = subSegIdsStr.split(MergeData.Split);
List<SpanView> spanViewList = new ArrayList<>();
for (String subSegId : subSegIds) {
logger.debug("subSegId: %s", subSegId);
String segmentSource = client.prepareGet(SegmentIndex.Index, SegmentIndex.Type_Record, subSegId).get().getSourceAsString();
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
String segmentId = segment.getTraceSegmentId();
List<TraceSegmentRef> refsList = segment.getRefs();
for (Span span : segment.getSpans()) {
logger.debug(span.getOperationName());
spansDataBuild(span, segment.getApplicationCode(), segmentId, spanViewList, refsList);
}
}
SpanView rootSpan = findRoot(spanViewList);
findChild(rootSpan, spanViewList, rootSpan.getStartTime());
List<SpanView> viewList = new ArrayList<>();
viewList.add(rootSpan);
Gson gson = new Gson();
String globalTraceStr = gson.toJson(viewList);
JsonObject responseObj = (JsonObject) response;
responseObj.addProperty("result", globalTraceStr);
}
}
private SpanView findRoot(List<SpanView> spanViewList) {
for (SpanView spanView : spanViewList) {
if (StringUtil.isEmpty(spanView.getParentSpanSegId())) {
spanView.setRelativeStartTime(0);
spanView.setParentSpanSegId("-1");
return spanView;
}
}
return null;
}
private void findChild(SpanView parentSpan, List<SpanView> spanViewList, long rootStartTime) {
String spanSegId = parentSpan.getSpanSegId();
logger.debug("findChild spanSegId: %s", spanSegId);
List<SpanView> childSpanSort = sortChildSpan(spanViewList, spanSegId);
for (SpanView spanView : childSpanSort) {
spanView.setRelativeStartTime(spanView.getStartTime() - rootStartTime);
parentSpan.addChild(spanView);
findChild(spanView, spanViewList, rootStartTime);
}
}
private List<SpanView> sortChildSpan(List<SpanView> spanViewList, String parentSpanId) {
List<SpanView> tempList = new ArrayList<>();
for (SpanView spanView : spanViewList) {
if (parentSpanId.equals(spanView.getParentSpanSegId())) {
tempList.add(spanView);
}
}
Collections.sort(tempList);
return tempList;
}
private void spansDataBuild(Span span, String appCode, String segmentId, List<SpanView> spanViewList, List<TraceSegmentRef> refsList) {
int spanId = span.getSpanId();
String spanSegId = segmentId + "--" + String.valueOf(spanId);
SpanView spanView = new SpanView();
spanView.setSpanId(spanId);
spanView.setSegId(segmentId);
spanView.setSpanSegId(spanSegId);
spanView.setStartTime(span.getStartTime());
spanView.setOperationName(span.getOperationName());
spanView.setAppCode(appCode);
long cost = span.getEndTime() - span.getStartTime();
if (cost == 0) {
spanView.setCost(1);
} else {
spanView.setCost(cost);
}
if (spanId == 0) {
if (CollectionTools.isNotEmpty(refsList)) {
if (refsList.size() > 1) {
throw new UnsupportedOperationException("not support batch call");
} else {
TraceSegmentRef segmentRef = refsList.get(0);
int parentSpanId = segmentRef.getSpanId();
String parentSegId = segmentRef.getTraceSegmentId();
String parentSpanSegId = parentSegId + "--" + String.valueOf(parentSpanId);
spanView.setParentSpanSegId(parentSpanSegId);
}
}
} else {
int parentSpanId = span.getParentSpanId();
String parentSpanSegId = segmentId + "--" + String.valueOf(parentSpanId);
spanView.setParentSpanSegId(parentSpanSegId);
}
spanViewList.add(spanView);
}
public static class Factory extends AbstractLocalSyncWorkerProvider<GlobalTraceSearchWithGlobalId> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public GlobalTraceSearchWithGlobalId workerInstance(ClusterWorkerContext clusterContext) {
return new GlobalTraceSearchWithGlobalId(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return GlobalTraceSearchWithGlobalId.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.google.gson.JsonObject;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class AbstractGet extends AbstractLocalSyncWorker {
public AbstractGet(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request, Object response) throws Exception {
Map<String, String[]> parameterMap = (Map<String, String[]>) request;
try {
onSearch(parameterMap, (JsonObject) response);
} catch (Exception e) {
e.printStackTrace();
((JsonObject) response).addProperty("isSuccess", false);
((JsonObject) response).addProperty("reason", e.getMessage());
}
}
protected abstract void onSearch(Map<String, String[]> request, JsonObject response) throws Exception;
static class GetWithHttpServlet extends AbstractHttpServlet {
private final LocalSyncWorkerRef ownerWorkerRef;
GetWithHttpServlet(LocalSyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
Map<String, String[]> parameterMap = request.getParameterMap();
JsonObject resJson = new JsonObject();
try {
ownerWorkerRef.ask(parameterMap, resJson);
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
/**
* @author pengys5
*/
public abstract class AbstractGetProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalSyncWorkerProvider<T> {
public abstract String servletPath();
final protected void create(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) super.create(AbstractWorker.noOwner());
AbstractGet.GetWithHttpServlet getWithHttpServlet = new AbstractGet.GetWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(getWithHttpServlet), servletPath());
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonObject;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
/**
* @author pengys5
*/
public abstract class AbstractHttpServlet extends HttpServlet {
final public void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter out = response.getWriter();
out.print(resJson);
out.flush();
out.close();
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
/**
* @author pengys5
*/
public abstract class AbstractPost extends AbstractLocalAsyncWorker {
private Logger logger = LogManager.getFormatterLogger(AbstractPost.class);
public AbstractPost(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request) throws Exception {
if (request instanceof String) {
onReceive((String) request);
} else if (request instanceof EndOfBatchCommand) {
} else {
logger.error("unhandled request, request instance must String, but is %s", request.getClass().toString());
}
}
protected abstract void onReceive(String reqJsonStr) throws Exception;
static class PostWithHttpServlet extends AbstractHttpServlet {
private final LocalAsyncWorkerRef ownerWorkerRef;
protected PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
StringBuffer dataStr = new StringBuffer();
String tmpStr;
while ((tmpStr = bufferedReader.readLine()) != null) {
dataStr.append(tmpStr);
}
ownerWorkerRef.tell(dataStr.toString());
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
/**
* @author pengys5
*/
public abstract class AbstractPostProvider<T extends AbstractLocalAsyncWorker> extends AbstractLocalAsyncWorkerProvider<T> {
public abstract String servletPath();
final protected void create(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalAsyncWorkerRef workerRef = (LocalAsyncWorkerRef) super.create(AbstractWorker.noOwner());
AbstractPost.PostWithHttpServlet postWithHttpServlet = new AbstractPost.PostWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(postWithHttpServlet), servletPath());
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.Role;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class Controller {
protected abstract NanoHTTPD.Method httpMethod();
protected abstract String path();
protected abstract JsonElement execute(Map<String, String> parms);
protected void tell(Role role, Object message) throws Exception {
// targetMember.beTold(message);
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.httpserver;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public enum ControllerCenter {
INSTANCE;
private Map<String, Controller> getControllers = new ConcurrentHashMap();
private Map<String, Controller> postControllers = new ConcurrentHashMap();
protected void register(NanoHTTPD.Method method, String path, Controller controller) throws DuplicateControllerException {
if (NanoHTTPD.Method.GET.equals(method)) {
if (getControllers.containsKey(path)) {
throw new DuplicateControllerException("method: " + method + "with path: " + path + " duplicate each other");
} else {
getControllers.put(path, controller);
}
} else if (NanoHTTPD.Method.POST.equals(method)) {
if (postControllers.containsKey(path)) {
throw new DuplicateControllerException("method: " + method + "with path: " + path + " duplicate each other");
} else {
postControllers.put(path, controller);
}
}
}
protected Controller find(NanoHTTPD.Method method, String path) {
if (NanoHTTPD.Method.GET.equals(method)) {
return getControllers.get(path);
} else if (NanoHTTPD.Method.POST.equals(method)) {
return postControllers.get(path);
} else {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ControllerCreator {
INSTANCE;
public void boot() throws Exception {
ServiceLoader<ControllerProvider> controllerLoader = java.util.ServiceLoader.load(ControllerProvider.class);
for (ControllerProvider provider : controllerLoader) {
Controller controller = provider.create();
ControllerCenter.INSTANCE.register(controller.httpMethod(), controller.path(), controller);
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
public class ControllerNotFoundException extends Exception {
public ControllerNotFoundException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
/**
* @author pengys5
*/
public abstract class ControllerProvider {
public abstract Class clazz();
public Controller create() throws Exception {
Controller controller = (Controller) clazz().newInstance();
return controller;
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
public class DuplicateControllerException extends Exception {
public DuplicateControllerException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Map;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
/**
* @author pengys5
......@@ -16,33 +15,23 @@ public enum HttpServer {
private Logger logger = LogManager.getFormatterLogger(HttpServer.class);
public void boot() throws Exception {
NanoHttpServer server = new NanoHttpServer(7001);
ControllerCreator.INSTANCE.boot();
}
public void boot(ClusterWorkerContext clusterContext) throws Exception {
Server server = new Server(7001);
String contextPath = "/";
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
logger.info("http server root context path: %s", contextPath);
ServletsCreator.INSTANCE.boot(servletContextHandler, clusterContext);
// ServerConnector serverConnector = new ServerConnector(server);
// serverConnector.setHost("127.0.0.1");
// serverConnector.setPort(7001);
// serverConnector.setIdleTimeout(5000);
public class NanoHttpServer extends NanoHTTPD {
public NanoHttpServer(int port) throws IOException {
super(port);
start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
logger.info("Running! Point your browsers to http://localhost:%d/", port);
}
@Override
public Response serve(IHTTPSession session) {
Method method = session.getMethod();
String uri = session.getUri();
Map<String, String> parms = session.getParms();
logger.debug("request method: %s, uri: %s, parms: %s", method.toString(), uri, parms);
try {
JsonElement response = RequestDispatcher.INSTANCE.dispatch(method, uri, parms);
return newFixedLengthResponse(Response.Status.OK, "text/json", response.toString());
} catch (ControllerNotFoundException e) {
String errorMessage = e.getMessage();
return newFixedLengthResponse(Response.Status.NOT_FOUND, "text/html", errorMessage);
}
}
server.setHandler(servletContextHandler);
server.start();
server.join();
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public enum RequestDispatcher {
INSTANCE;
public JsonElement dispatch(NanoHTTPD.Method method, String uri, Map<String, String> parms) throws ControllerNotFoundException {
Controller controller = ControllerCenter.INSTANCE.find(method, uri);
if (controller != null) {
return controller.execute(parms);
} else {
throw new ControllerNotFoundException("Could not found controller for [method: " + method.name() + ", uri: " + uri + "]");
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.servlet.ServletContextHandler;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ServletsCreator {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(ServletsCreator.class);
public void boot(ServletContextHandler servletContextHandler, ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
ServiceLoader<AbstractPostProvider> receiverLoader = java.util.ServiceLoader.load(AbstractPostProvider.class);
for (AbstractPostProvider provider : receiverLoader) {
provider.setClusterContext(clusterContext);
provider.create(servletContextHandler);
logger.info("add post servlet mapping path: %s ", provider.servletPath());
}
ServiceLoader<AbstractGetProvider> searcherLoader = java.util.ServiceLoader.load(AbstractGetProvider.class);
for (AbstractGetProvider provider : searcherLoader) {
provider.setClusterContext(clusterContext);
provider.create(servletContextHandler);
logger.info("add get servlet mapping path: %s ", provider.servletPath());
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver.controller;
import com.a.eye.skywalking.collector.worker.httpserver.Controller;
import com.a.eye.skywalking.collector.worker.httpserver.ControllerProvider;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public class DagController extends Controller {
@Override
public NanoHTTPD.Method httpMethod() {
return NanoHTTPD.Method.GET;
}
@Override
public String path() {
return "/getNodes";
}
@Override
public JsonElement execute(Map<String, String> parms) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("test", "aaaa");
return jsonObject;
}
public static class Factory extends ControllerProvider {
@Override
public Class clazz() {
return DagController.class;
}
}
}
package com.a.eye.skywalking.collector.worker.node;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
/**
* @author pengys5
*/
public class NodeCompIndex extends AbstractIndex {
public static final String Index = "node_comp_idx";
public static final String Name = "name";
public static final String Peers = "peers";
@Override
public String index() {
return Index;
}
@Override
public boolean isRecord() {
return false;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(Name)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Peers)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
package com.a.eye.skywalking.collector.worker.node;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
/**
* @author pengys5
*/
public class NodeMappingIndex extends AbstractIndex {
public static final String Index = "node_mapping_idx";
public static final String Code = "code";
public static final String Peers = "peers";
@Override
public String index() {
return Index;
}
@Override
public boolean isRecord() {
return false;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(Code)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Peers)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
import com.a.eye.skywalking.collector.worker.tools.ClientSpanIsLeafTools;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.collector.worker.tools.SpanPeersTools;
import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.tag.Tags;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeCompAnalysis.class);
AbstractNodeCompAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
void analyseSpans(TraceSegment segment) throws Exception {
List<Span> spanList = segment.getSpans();
logger.debug("node analysis span isNotEmpty %s", CollectionTools.isNotEmpty(spanList));
if (CollectionTools.isNotEmpty(spanList)) {
logger.debug("node analysis span list size: %s", spanList.size());
for (Span span : spanList) {
String kind = Tags.SPAN_KIND.get(span);
if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
String peers = SpanPeersTools.getPeers(span);
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.Peers, peers);
compJsonObj.addProperty(NodeCompIndex.Name, Tags.COMPONENT.get(span));
setRecord(peers, compJsonObj);
} else if (Tags.SPAN_KIND_SERVER.equals(kind) && span.getParentSpanId() == -1) {
String peers = segment.getApplicationCode();
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.Peers, peers);
compJsonObj.addProperty(NodeCompIndex.Name, Tags.COMPONENT.get(span));
setRecord(peers, compJsonObj);
} else {
logger.error("The span kind value is incorrect which segment record id is %s, the value must client or server", segment.getTraceSegmentId());
}
}
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.RecordAnalysisMember;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeMappingAnalysis.class);
AbstractNodeMappingAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
void analyseRefs(TraceSegment segment, long timeSlice) throws Exception {
List<TraceSegmentRef> segmentRefList = segment.getRefs();
logger.debug("node mapping analysis refs isNotEmpty %s", CollectionTools.isNotEmpty(segmentRefList));
if (CollectionTools.isNotEmpty(segmentRefList)) {
logger.debug("node mapping analysis refs list size: %s", segmentRefList.size());
for (TraceSegmentRef segmentRef : segmentRefList) {
String peers = Const.PEERS_FRONT_SPLIT + segmentRef.getPeerHost() + Const.PEERS_BEHIND_SPLIT;
String code = segment.getApplicationCode();
JsonObject nodeMappingJsonObj = new JsonObject();
nodeMappingJsonObj.addProperty(NodeMappingIndex.Code, code);
nodeMappingJsonObj.addProperty(NodeMappingIndex.Peers, peers);
nodeMappingJsonObj.addProperty(NodeMappingIndex.Time_Slice, timeSlice);
String id = timeSlice + Const.ID_SPLIT + code + Const.ID_SPLIT + peers;
setRecord(id, nodeMappingJsonObj);
}
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeCompAgg;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
NodeCompAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof TraceSegment) {
TraceSegment segment = (TraceSegment) message;
analyseSpans(segment);
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeCompAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeCompAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeCompAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeCompAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeDayAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeCompAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingDayAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class NodeMappingDayAnalysis extends AbstractNodeMappingAnalysis {
public NodeMappingDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getDay());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeMappingDayAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingDayAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMappingDayAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingDayAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingDayAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingDayAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingHourAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class NodeMappingHourAnalysis extends AbstractNodeMappingAnalysis {
public NodeMappingHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getHour());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeMappingHourAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingHourAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMappingHourAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingHourAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingHourAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingHourAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.analysis;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.persistence.NodeMappingMinuteAgg;
import com.a.eye.skywalking.collector.worker.segment.SegmentPost;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
import com.a.eye.skywalking.trace.TraceSegment;
/**
* @author pengys5
*/
public class NodeMappingMinuteAnalysis extends AbstractNodeMappingAnalysis {
public NodeMappingMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getMinute());
}
}
@Override
protected void aggregation() throws Exception {
RecordData oneRecord;
while ((oneRecord = pushOne()) != null) {
getClusterContext().lookup(NodeMappingMinuteAgg.Role.INSTANCE).tell(oneRecord);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingMinuteAnalysis> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMappingMinuteAnalysis workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingMinuteAnalysis(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingMinuteAnalysis.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingMinuteAnalysis.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
* @author pengys5
*/
public class NodeCompAgg extends AbstractClusterWorker {
public NodeCompAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeCompSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeCompSave.Role.INSTANCE).tell(message);
} else {
throw new IllegalArgumentException("message instance must RecordData");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeCompAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeCompAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeCompAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeDayAgg.Value;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeCompAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.search.SearchHit;
/**
* @author pengys5
*/
public class NodeCompLoad extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(NodeCompLoad.class);
NodeCompLoad(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws Exception {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeCompIndex.Index);
searchRequestBuilder.setTypes(NodeCompIndex.Type_Record);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(100);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
SearchHit[] searchHits = searchResponse.getHits().getHits();
JsonArray nodeCompArray = new JsonArray();
for (SearchHit searchHit : searchHits) {
JsonObject nodeCompObj = new JsonObject();
nodeCompObj.addProperty(NodeCompIndex.Name, (String) searchHit.getSource().get(NodeCompIndex.Name));
nodeCompObj.addProperty(NodeCompIndex.Peers, (String) searchHit.getSource().get(NodeCompIndex.Peers));
nodeCompArray.add(nodeCompObj);
logger.debug("node: %s", nodeCompObj.toString());
}
JsonObject resJsonObj = (JsonObject) response;
resJsonObj.add("result", nodeCompArray);
}
public static class Factory extends AbstractLocalSyncWorkerProvider<NodeCompLoad> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeCompLoad workerInstance(ClusterWorkerContext clusterContext) {
return new NodeCompLoad(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeCompLoad.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
/**
* @author pengys5
*/
public class NodeCompSave extends RecordPersistenceMember {
NodeCompSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeCompIndex.Index;
}
@Override
public String esType() {
return NodeCompIndex.Type_Record;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeCompSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeCompSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeCompSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeCompSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
* @author pengys5
*/
public class NodeMappingDayAgg extends AbstractClusterWorker {
public NodeMappingDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeMappingDaySave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeMappingDaySave.Role.INSTANCE).tell(message);
} else {
throw new IllegalArgumentException("message instance must RecordData");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeMappingDayAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMappingDayAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingDayAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeMappingDayAgg.Value;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingDayAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.application.persistence;
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
/**
* @author pengys5
*/
public class DAGNodePersistence extends RecordPersistenceMember {
public class NodeMappingDaySave extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodePersistence.class);
public DAGNodePersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public NodeMappingDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "application";
return NodeMappingIndex.Index;
}
@Override
public String esType() {
return "dag_node";
return NodeMappingIndex.Type_Day;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodePersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingDaySave> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -40,8 +37,8 @@ public class DAGNodePersistence extends RecordPersistenceMember {
}
@Override
public DAGNodePersistence workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodePersistence(role(), clusterContext, new LocalWorkerContext());
public NodeMappingDaySave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingDaySave(role(), clusterContext, new LocalWorkerContext());
}
@Override
......@@ -55,12 +52,12 @@ public class DAGNodePersistence extends RecordPersistenceMember {
@Override
public String roleName() {
return DAGNodePersistence.class.getSimpleName();
return NodeMappingDaySave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
* @author pengys5
*/
public class NodeMappingHourAgg extends AbstractClusterWorker {
NodeMappingHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeMappingHourSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeMappingHourSave.Role.INSTANCE).tell(message);
} else {
throw new IllegalArgumentException("message instance must RecordData");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeMappingHourAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMappingHourAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingHourAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeMappingHourAgg.Value;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingHourAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
/**
* @author pengys5
*/
public class NodeMappingHourSave extends RecordPersistenceMember {
public NodeMappingHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeMappingIndex.Index;
}
@Override
public String esType() {
return NodeMappingIndex.Type_Hour;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingHourSave> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMappingHourSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingHourSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingHourSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.RecordData;
/**
* @author pengys5
*/
public class NodeMappingMinuteAgg extends AbstractClusterWorker {
NodeMappingMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeMappingMinuteSave.Role.INSTANCE).create(this);
}
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeMappingMinuteSave.Role.INSTANCE).tell(message);
} else {
throw new IllegalArgumentException("message instance must RecordData");
}
}
public static class Factory extends AbstractClusterWorkerProvider<NodeMappingMinuteAgg> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeMappingMinuteAgg workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingMinuteAgg(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeMappingMinuteAgg.Value;
}
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return NodeMappingMinuteAgg.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.applicationref.persistence;
package com.a.eye.skywalking.collector.worker.node.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.RecordPersistenceMember;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
/**
* @author pengys5
*/
public class DAGNodeRefPersistence extends RecordPersistenceMember {
public class NodeMappingMinuteSave extends RecordPersistenceMember {
private Logger logger = LogManager.getFormatterLogger(DAGNodeRefPersistence.class);
public DAGNodeRefPersistence(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public NodeMappingMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return "node_ref";
return NodeMappingIndex.Index;
}
@Override
public String esType() {
return "node_ref";
return NodeMappingIndex.Type_Minute;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<DAGNodeRefPersistence> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingMinuteSave> {
public static Factory INSTANCE = new Factory();
@Override
......@@ -41,13 +37,13 @@ public class DAGNodeRefPersistence extends RecordPersistenceMember {
}
@Override
public DAGNodeRefPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new DAGNodeRefPersistence(role(), clusterContext, new LocalWorkerContext());
public NodeMappingMinuteSave workerInstance(ClusterWorkerContext clusterContext) {
return new NodeMappingMinuteSave(role(), clusterContext, new LocalWorkerContext());
}
@Override
public int queueSize() {
return WorkerConfig.Queue.Persistence.DAGNodeRefPersistence.Size;
return WorkerConfig.Queue.Persistence.DAGNodePersistence.Size;
}
}
......@@ -56,12 +52,12 @@ public class DAGNodeRefPersistence extends RecordPersistenceMember {
@Override
public String roleName() {
return DAGNodeRefPersistence.class.getSimpleName();
return NodeMappingMinuteSave.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
return new HashCodeSelector();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册