提交 73417f92 编写于 作者: P pengys5

add dag node search service use jetty servlet

上级 f71ccb81
......@@ -17,6 +17,7 @@
<properties>
<akka.version>2.4.17</akka.version>
<log4j.version>2.8.1</log4j.version>
</properties>
<dependencies>
......@@ -26,9 +27,14 @@
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-api</artifactId>
<version>${project.version}</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
......
......@@ -4,12 +4,12 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ServiceLoader;
......@@ -17,7 +17,8 @@ import java.util.ServiceLoader;
* @author pengys5
*/
public class CollectorSystem {
private ILog logger = LogManager.getLogger(CollectorSystem.class);
private Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);
private ClusterWorkerContext clusterContext;
public LookUp getClusterContext() {
......@@ -36,14 +37,11 @@ public class CollectorSystem {
}
private void createAkkaSystem() {
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes)).
withFallback(ConfigFactory.load("application.conf"));
ActorSystem akkaSystem = ActorSystem.create("ClusterSystem", config);
ActorSystem akkaSystem = ActorSystem.create(Const.SystemName, config);
clusterContext = new ClusterWorkerContext(akkaSystem);
}
......
......@@ -7,8 +7,8 @@ import akka.cluster.Member;
import akka.cluster.MemberStatus;
import com.a.eye.skywalking.collector.cluster.WorkerListenerMessage;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
......@@ -19,8 +19,15 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
super(role, clusterContext, selfContext);
}
final public void allocateJob(Object message) throws Exception {
onWork(message);
}
protected abstract void onWork(Object message) throws Exception;
static class WorkerWithAkka extends UntypedActor {
private static ILog logger = LogManager.getLogger(WorkerWithAkka.class);
private Logger logger = LogManager.getFormatterLogger(WorkerWithAkka.class);
private Cluster cluster;
private final AbstractClusterWorker ownerWorker;
......@@ -59,7 +66,7 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
register(memberUp.member());
} else {
logger.debug("worker class: %s, message class: %s", this.getClass().getName(), message.getClass().getName());
ownerWorker.work(message);
ownerWorker.allocateJob(message);
}
}
......
......@@ -14,6 +14,16 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
}
final public void allocateJob(Object request) throws Exception {
onWork(request);
}
protected abstract void onWork(Object request) throws Exception;
static class WorkerWithDisruptor implements EventHandler<MessageHolder> {
private RingBuffer<MessageHolder> ringBuffer;
......@@ -28,9 +38,10 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
try {
Object message = event.getMessage();
event.reset();
asyncWorker.work(message);
asyncWorker.allocateJob(message);
if (endOfBatch) {
asyncWorker.work(new EndOfBatchCommand());
asyncWorker.allocateJob(new EndOfBatchCommand());
}
} catch (Exception e) {
e.printStackTrace();
......
......@@ -7,4 +7,14 @@ public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final public void allocateJob(Object request, Object response) throws Exception {
onWork(request, response);
}
protected abstract void onWork(Object request, Object response) throws Exception;
@Override
public void preStart() throws ProviderNotFoundException {
}
}
......@@ -18,8 +18,6 @@ public abstract class AbstractWorker {
public abstract void preStart() throws ProviderNotFoundException;
public abstract void work(Object message) throws Exception;
final public LookUp getSelfContext() {
return selfContext;
}
......
package com.a.eye.skywalking.collector.actor;
import akka.actor.ActorSystem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -11,7 +11,8 @@ import java.util.concurrent.ConcurrentHashMap;
* @author pengys5
*/
public class ClusterWorkerContext extends WorkerContext {
private ILog logger = LogManager.getLogger(ClusterWorkerContext.class);
private Logger logger = LogManager.getFormatterLogger(ClusterWorkerContext.class);
private final ActorSystem akkaSystem;
private Map<String, AbstractWorkerProvider> providers = new ConcurrentHashMap<>();
......
......@@ -14,6 +14,10 @@ public class LocalSyncWorkerRef extends WorkerRef {
@Override
public void tell(Object message) throws Exception {
localSyncWorker.work(message);
localSyncWorker.allocateJob(message, null);
}
public void ask(Object request, Object response) throws Exception {
localSyncWorker.allocateJob(request, response);
}
}
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import java.util.List;
/**
......@@ -11,7 +11,7 @@ import java.util.List;
*/
public class WorkerRefs<T extends WorkerRef> {
private static ILog logger = LogManager.getLogger(WorkerRefs.class);
private Logger logger = LogManager.getFormatterLogger(WorkerRefs.class);
private List<T> workerRefs;
private WorkerSelector workerSelector;
......@@ -25,4 +25,13 @@ public class WorkerRefs<T extends WorkerRef> {
logger.debug("WorkerSelector instance of %s", workerSelector.getClass());
workerSelector.select(workerRefs, message).tell(message);
}
public void ask(Object request, Object response) throws Exception {
WorkerRef workerRef = workerSelector.select(workerRefs, request);
if (workerRef instanceof LocalSyncWorkerRef) {
((LocalSyncWorkerRef) workerRef).ask(request, response);
} else {
throw new IllegalAccessError("only local sync worker can ask");
}
}
}
......@@ -6,7 +6,7 @@ package com.a.eye.skywalking.collector.cluster;
* {@link Cluster.Current#port} is a port of server use to bind
* {@link Cluster.Current#roles} is a roles of workers that use to create workers which
* has those role in this process.
* {@link Cluster#seed_nodes} is a seed_nodes which cluster have.
* {@link Cluster#seed_nodes} is a seed_nodes which cluster have, List of strings, e.g. seed_nodes = "ip:port,ip:port"..
*
* @author pengys5
*/
......
......@@ -2,8 +2,9 @@ package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.api.util.ConfigInitializer;
import com.a.eye.skywalking.api.util.StringUtil;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.util.Properties;
......@@ -20,7 +21,7 @@ import java.util.Properties;
*/
public class ClusterConfigInitializer {
private static ILog logger = LogManager.getLogger(ClusterConfigInitializer.class);
private static Logger logger = LogManager.getFormatterLogger(ClusterConfigInitializer.class);
public static final String ConfigFileName = "collector.config";
......
package com.a.eye.skywalking.collector.cluster;
/**
* @author pengys5
*/
public class Const {
public static final String SystemName = "ClusterSystem";
}
......@@ -5,10 +5,10 @@ import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.ClusterWorkerRef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Iterator;
import java.util.Map;
......@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class WorkersListener extends UntypedActor {
private ILog logger = LogManager.getLogger(WorkersListener.class);
private Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
public static final String WorkName = "WorkersListener";
......@@ -52,7 +52,7 @@ public class WorkersListener extends UntypedActor {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
ActorRef sender = getSender();
// logger.info("register worker of role: %s, path: %s", register.getWorkRole(), sender.toString());
logger.info("register worker of role: %s, path: %s", register.getRole().roleName(), sender.toString());
ClusterWorkerRef workerRef = new ClusterWorkerRef(sender, register.getRole());
relation.put(sender, workerRef);
clusterContext.put(new ClusterWorkerRef(sender, register.getRole()));
......
......@@ -20,7 +20,7 @@ public class TestClusterWorker extends AbstractClusterWorker {
}
@Override
public void work(Object message) throws Exception {
public void onWork(Object message) throws Exception {
if (message.equals("Print")) {
System.out.println(message);
} else if (message.equals("TellLocalWorker")) {
......
package com.a.eye.skywalking.collector.actor;
import com.a.eye.skywalking.collector.CollectorSystem;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
......@@ -12,18 +9,18 @@ public class TestClusterWorkerTestCase {
private CollectorSystem collectorSystem;
// @Before
// @Before
public void createSystem() throws Exception {
collectorSystem = new CollectorSystem();
collectorSystem.boot();
}
// @Before
// @Before
public void terminateSystem() {
collectorSystem.terminate();
}
// @Test
// @Test
public void testTellWorker() throws Exception {
WorkerRefs workerRefs = collectorSystem.getClusterContext().lookup(TestClusterWorker.TestClusterWorkerRole.INSTANCE);
workerRefs.tell("Print");
......
......@@ -18,7 +18,7 @@ public class TestLocalAsyncWorker extends AbstractLocalAsyncWorker {
}
@Override
public void work(Object message) throws Exception {
public void onWork(Object message) throws Exception {
if (message.equals("TellLocalAsyncWorker")) {
System.out.println("hello async!");
}
......
......@@ -18,8 +18,8 @@ public class TestLocalSyncWorker extends AbstractLocalSyncWorker {
}
@Override
public void work(Object message) throws Exception {
if (message.equals("TellLocalWorker")) {
public void onWork(Object request, Object response) throws Exception {
if (request.equals("TellLocalWorker")) {
System.out.println("hello! ");
} else {
System.out.println("unhandled");
......
package com.a.eye.skywalking.collector.commons.config;
/**
* @author pengys5
*/
public enum SeedNodesFormatter {
INSTANCE;
public String formatter(String seedNodes) {
return null;
}
}
package com.a.eye.skywalking.collector.commons.serializer;
import akka.serialization.JSerializer;
import com.a.eye.skywalking.logging.ILog;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.proto.SegmentMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class TraceSegmentSerializer extends JSerializer {
private static ILog logger = LogManager.getLogger(TraceSegmentSerializer.class);
private Logger logger = LogManager.getFormatterLogger(TraceSegmentSerializer.class);
@Override
public boolean includeManifest() {
......
......@@ -29,5 +29,6 @@ akka {
cluster {
auto-down-unreachable-after = off
metrics.enabled = off
roles = ["WorkersListener"]
}
}
......@@ -12,6 +12,10 @@
<artifactId>skywalking-collector-worker</artifactId>
<packaging>jar</packaging>
<properties>
<jetty.version>9.4.2.v20170220</jetty.version>
</properties>
<dependencies>
<dependency>
<groupId>com.a.eye</groupId>
......@@ -20,18 +24,18 @@
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-logging-log4j2</artifactId>
<artifactId>skywalking-collector-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>skywalking-collector-commons</artifactId>
<version>${project.version}</version>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.nanohttpd</groupId>
<artifactId>nanohttpd</artifactId>
<version>2.3.1</version>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
......@@ -45,35 +49,4 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<relocations>
<relocation>
<pattern>${shade.akka.source}</pattern>
<shadedPattern>${shade.akka.target}</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -24,7 +24,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
}
@Override
public void work(Object message) throws Exception {
public void onWork(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
......
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.cluster.ClusterConfig;
import com.a.eye.skywalking.collector.CollectorSystem;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.cluster.ClusterConfigInitializer;
import com.a.eye.skywalking.logging.LogManager;
import com.a.eye.skywalking.logging.log4j2.Log4j2Resolver;
import com.a.eye.skywalking.collector.worker.httpserver.WebServer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.a.eye.skywalking.collector.worker.httpserver.HttpServer;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class CollectorBootStartUp {
/**
* TODO pengys5, make the exception clear.
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
LogManager.setLogResolver(new Log4j2Resolver());
private static Logger logger = LogManager.getFormatterLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws Exception {
logger.info("collector system starting....");
ClusterConfigInitializer.initialize("collector.config");
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.parseString("akka.cluster.roles=" + ClusterConfig.Cluster.Current.roles)).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + ClusterConfig.Cluster.seed_nodes)).
withFallback(ConfigFactory.load("application.conf"));
// ActorSystem system = ActorSystem.create(ClusterConfig.Cluster.appname, config);
// WorkersCreator.INSTANCE.boot(system);
WebServer.INSTANCE.boot();
// EsClient.boot();
CollectorSystem collectorSystem = new CollectorSystem();
collectorSystem.boot();
EsClient.boot();
HttpServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
}
}
......@@ -24,11 +24,11 @@ public abstract class PersistenceMember extends AbstractLocalAsyncWorker {
@Override
public void preStart() throws ProviderNotFoundException {
}
@Override
public void work(Object message) throws Exception {
protected void onWork(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
persistence();
} else {
......
package com.a.eye.skywalking.collector.worker;
/**
* @author pengys5
*/
public abstract class TimeSlice {
private long timeSlice;
private String sliceType;
public TimeSlice(String sliceType, long timeSlice) {
this.timeSlice = timeSlice;
this.sliceType = sliceType;
}
public long getTimeSlice() {
return timeSlice;
}
public String getSliceType() {
return sliceType;
}
}
......@@ -13,10 +13,11 @@ import com.a.eye.skywalking.trace.Span;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import com.a.eye.skywalking.trace.tag.Tags;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
/**
* @author pengys5
*/
......@@ -38,10 +39,10 @@ public class ApplicationMain extends AbstractLocalSyncWorker {
}
@Override
public void work(Object message) throws Exception {
if (message instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof TraceSegmentReceiver.TraceSegmentTimeSlice) {
logger.debug("begin translate TraceSegment Object to JsonObject");
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) request;
getSelfContext().lookup(TraceSegmentRecordPersistence.Role.INSTANCE).tell(traceSegment);
......
......@@ -26,7 +26,7 @@ public class DAGNodeReceiver extends AbstractClusterWorker {
}
@Override
public void work(Object message) throws Exception {
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(DAGNodePersistence.Role.INSTANCE).tell(message);
} else {
......
......@@ -26,7 +26,7 @@ public class NodeInstanceReceiver extends AbstractClusterWorker {
}
@Override
public void work(Object message) throws Exception {
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeInstancePersistence.Role.INSTANCE).tell(message);
} else {
......
......@@ -26,7 +26,7 @@ public class ResponseCostReceiver extends AbstractClusterWorker {
}
@Override
public void work(Object message) throws Exception {
protected void onWork(Object message) throws Exception {
if (message instanceof MetricData) {
getSelfContext().lookup(ResponseCostPersistence.Role.INSTANCE).tell(message);
} else {
......
......@@ -26,7 +26,7 @@ public class ResponseSummaryReceiver extends AbstractClusterWorker {
}
@Override
public void work(Object message) throws Exception {
protected void onWork(Object message) throws Exception {
if (message instanceof MetricData) {
getSelfContext().lookup(ResponseSummaryPersistence.Role.INSTANCE).tell(message);
} else {
......
......@@ -7,6 +7,7 @@ import com.a.eye.skywalking.collector.worker.applicationref.analysis.DAGNodeRefA
import com.a.eye.skywalking.collector.worker.receiver.TraceSegmentReceiver;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.trace.TraceSegmentRef;
import java.util.List;
/**
......@@ -26,12 +27,12 @@ public class ApplicationRefMain extends AbstractLocalSyncWorker {
}
@Override
public void work(Object message) throws Exception {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) message;
protected void onWork(Object request, Object response) throws Exception {
TraceSegmentReceiver.TraceSegmentTimeSlice traceSegment = (TraceSegmentReceiver.TraceSegmentTimeSlice) request;
TraceSegment segment = traceSegment.getTraceSegment();
List<TraceSegmentRef> refs = segment.getRefs();
if(refs != null){
if (refs != null) {
for (TraceSegmentRef ref : refs) {
String front = ref.getApplicationCode();
String behind = segment.getApplicationCode();
......
......@@ -28,7 +28,7 @@ public class DAGNodeRefReceiver extends AbstractClusterWorker {
}
@Override
public void work(Object message) throws Exception {
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(DAGNodeRefPersistence.Role.INSTANCE).tell(message);
} else {
......
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ClientNodeIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
/**
* @author pengys5
*/
public class ClientNodeSearchPersistence extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(ClientNodeSearchPersistence.class);
public ClientNodeSearchPersistence(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
JsonObject resJsonObj = (JsonObject) response;
JsonArray result = search(search.getSliceType(), search.getTimeSlice());
resJsonObj.add("result", result);
} else {
throw new IllegalArgumentException("message instance must be RequestEntity");
}
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(ClientNodeIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery("timeSlice", timeSlice));
SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHit[] hits = response.getHits().getHits();
logger.debug("client node list size: %s", hits.length);
JsonArray clientNodeArray = new JsonArray();
for (SearchHit hit : response.getHits().getHits()) {
JsonObject clientNodeObj = new JsonObject();
clientNodeObj.addProperty("layer", (String) hit.getSource().get("layer"));
clientNodeObj.addProperty("component", (String) hit.getSource().get("component"));
clientNodeObj.addProperty("serverHost", (String) hit.getSource().get("serverHost"));
clientNodeObj.addProperty("timeSlice", (Long) hit.getSource().get("timeSlice"));
clientNodeArray.add(clientNodeObj);
}
return clientNodeArray;
}
public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) {
super(sliceType, timeSlice);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<ClientNodeSearchPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ClientNodeSearchPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new ClientNodeSearchPersistence(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ClientNodeSearchPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeInstanceIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
/**
* @author pengys5
*/
public class NodeInstanceSearchPersistence extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceSearchPersistence.class);
public NodeInstanceSearchPersistence(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
JsonObject resJsonObj = (JsonObject) response;
JsonArray result = search(search.getSliceType(), search.getTimeSlice());
resJsonObj.add("result", result);
} else {
throw new IllegalArgumentException("message instance must be RequestEntity");
}
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstanceIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.multiMatchQuery(timeSlice, "timeSlice"));
SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHit[] hits = response.getHits().getHits();
logger.debug("dag node list size: %s", hits.length);
JsonArray dagNodeArray = new JsonArray();
for (SearchHit hit : response.getHits().getHits()) {
JsonObject dagNodeObj = new JsonObject();
dagNodeObj.addProperty("code", (String) hit.getSource().get("code"));
dagNodeObj.addProperty("address", (String) hit.getSource().get("address"));
dagNodeObj.addProperty("timeSlice", (Long) hit.getSource().get("timeSlice"));
dagNodeArray.add(dagNodeObj);
}
return dagNodeArray;
}
public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) {
super(sliceType, timeSlice);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<NodeInstanceSearchPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeInstanceSearchPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceSearchPersistence(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstanceSearchPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeInstanceIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
/**
* @author pengys5
*/
public class NodeInstanceSummarySearchPersistence extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceSummarySearchPersistence.class);
public NodeInstanceSummarySearchPersistence(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
JsonObject resJsonObj = (JsonObject) response;
JsonArray result = search(search.getSliceType(), search.getTimeSlice());
resJsonObj.add("result", result);
} else {
throw new IllegalArgumentException("message instance must be RequestEntity");
}
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeInstanceIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.multiMatchQuery(timeSlice, "timeSlice"));
SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHit[] hits = response.getHits().getHits();
logger.debug("dag node list size: %s", hits.length);
JsonArray dagNodeArray = new JsonArray();
for (SearchHit hit : response.getHits().getHits()) {
JsonObject dagNodeObj = new JsonObject();
dagNodeObj.addProperty("code", (String) hit.getSource().get("code"));
dagNodeObj.addProperty("address", (String) hit.getSource().get("address"));
dagNodeObj.addProperty("timeSlice", (Long) hit.getSource().get("timeSlice"));
dagNodeArray.add(dagNodeObj);
}
return dagNodeArray;
}
public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) {
super(sliceType, timeSlice);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<NodeInstanceSummarySearchPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeInstanceSummarySearchPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceSummarySearchPersistence(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstanceSummarySearchPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeRefIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
/**
* @author pengys5
*/
public class NodeRefSearchPersistence extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefSearchPersistence.class);
public NodeRefSearchPersistence(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
JsonObject resJsonObj = (JsonObject) response;
JsonArray result = search(search.getSliceType(), search.getTimeSlice());
resJsonObj.add("result", result);
} else {
throw new IllegalArgumentException("message instance must be RequestEntity");
}
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(NodeRefIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery("timeSlice", timeSlice));
SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHit[] hits = response.getHits().getHits();
logger.debug("node reference list size: %s", hits.length);
JsonArray nodeRefArray = new JsonArray();
for (SearchHit hit : response.getHits().getHits()) {
JsonObject nodeRefObj = new JsonObject();
nodeRefObj.addProperty("client", (String) hit.getSource().get("client"));
nodeRefObj.addProperty("server", (String) hit.getSource().get("server"));
nodeRefObj.addProperty("timeSlice", (Long) hit.getSource().get("timeSlice"));
nodeRefArray.add(nodeRefObj);
}
return nodeRefArray;
}
public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) {
super(sliceType, timeSlice);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<NodeRefSearchPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefSearchPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefSearchPersistence(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefSearchPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.TimeSlice;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ServerNodeIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
/**
* @author pengys5
*/
public class ServerNodeSearchPersistence extends AbstractLocalSyncWorker {
private Logger logger = LogManager.getFormatterLogger(ServerNodeSearchPersistence.class);
public ServerNodeSearchPersistence(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
JsonObject resJsonObj = (JsonObject) response;
JsonArray result = search(search.getSliceType(), search.getTimeSlice());
resJsonObj.add("result", result);
} else {
throw new IllegalArgumentException("message instance must be RequestEntity");
}
}
public JsonArray search(String type, long timeSlice) {
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(ServerNodeIndex.Index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery("timeSlice", timeSlice));
SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHit[] hits = response.getHits().getHits();
logger.debug("server node list size: %s", hits.length);
JsonArray serverNodeArray = new JsonArray();
for (SearchHit hit : response.getHits().getHits()) {
JsonObject serverNodeObj = new JsonObject();
serverNodeObj.addProperty("code", (String) hit.getSource().get("code"));
serverNodeObj.addProperty("component", (String) hit.getSource().get("component"));
serverNodeObj.addProperty("layer", (String) hit.getSource().get("layer"));
serverNodeObj.addProperty("timeSlice", (Long) hit.getSource().get("timeSlice"));
serverNodeArray.add(serverNodeObj);
}
return serverNodeArray;
}
public static class RequestEntity extends TimeSlice {
public RequestEntity(String sliceType, long timeSlice) {
super(sliceType, timeSlice);
}
}
public static class Factory extends AbstractLocalSyncWorkerProvider<ServerNodeSearchPersistence> {
public static Factory INSTANCE = new Factory();
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServerNodeSearchPersistence workerInstance(ClusterWorkerContext clusterContext) {
return new ServerNodeSearchPersistence(role(), clusterContext, new LocalWorkerContext());
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServerNodeSearchPersistence.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.searcher;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.ClientNodeSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class ClientNodeWithTimeSliceSearcher extends AbstractSearcher {
private Logger logger = LogManager.getFormatterLogger(ClientNodeWithTimeSliceSearcher.class);
private ClientNodeWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(ClientNodeSearchPersistence.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType");
}
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType")));
long timeSlice;
try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type");
}
ClientNodeSearchPersistence.RequestEntity requestEntity;
requestEntity = new ClientNodeSearchPersistence.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice);
getSelfContext().lookup(ClientNodeSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<ClientNodeWithTimeSliceSearcher> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ClientNodeWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new ClientNodeWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/dagNode/search/clientNodeWithTimeSlice";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ClientNodeWithTimeSliceSearcher.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.searcher;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeInstanceSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeInstanceWithTimeSliceSearcher extends AbstractSearcher {
private Logger logger = LogManager.getFormatterLogger(NodeInstanceWithTimeSliceSearcher.class);
private NodeInstanceWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeInstanceSearchPersistence.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType");
}
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType")));
long timeSlice;
try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type");
}
NodeInstanceSearchPersistence.RequestEntity requestEntity;
requestEntity = new NodeInstanceSearchPersistence.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice);
getSelfContext().lookup(NodeInstanceSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<NodeInstanceWithTimeSliceSearcher> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeInstanceWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new NodeInstanceWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/dagNode/search/nodeInstanceWithTimeSlice";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeInstanceWithTimeSliceSearcher.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.searcher;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeRefSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeRefWithTimeSliceSearcher extends AbstractSearcher {
private Logger logger = LogManager.getFormatterLogger(NodeRefWithTimeSliceSearcher.class);
private NodeRefWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(NodeRefSearchPersistence.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType");
}
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType")));
long timeSlice;
try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type");
}
NodeRefSearchPersistence.RequestEntity requestEntity;
requestEntity = new NodeRefSearchPersistence.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice);
getSelfContext().lookup(NodeRefSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<NodeRefWithTimeSliceSearcher> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public NodeRefWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/dagNode/search/nodeRefWithTimeSlice";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeRefWithTimeSliceSearcher.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.dagnode.searcher;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.dagnode.persistence.ServerNodeSearchPersistence;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcher;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractSearcherProvider;
import com.a.eye.skywalking.collector.worker.tools.ParameterTools;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Map;
/**
* @author pengys5
*/
public class ServerNodeWithTimeSliceSearcher extends AbstractSearcher {
private Logger logger = LogManager.getFormatterLogger(ServerNodeWithTimeSliceSearcher.class);
private ServerNodeWithTimeSliceSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void preStart() throws ProviderNotFoundException {
getClusterContext().findProvider(ServerNodeSearchPersistence.WorkerRole.INSTANCE).create(this);
}
@Override
protected void onSearch(Map<String, String[]> request, JsonObject response) throws Exception {
if (!request.containsKey("timeSliceValue") || !request.containsKey("timeSliceType")) {
throw new IllegalArgumentException("the request parameter must contains timeSliceValue and timeSliceType");
}
logger.debug("timeSliceValue: %s, timeSliceType: %s", Arrays.toString(request.get("timeSliceValue")), Arrays.toString(request.get("timeSliceType")));
long timeSlice;
try {
timeSlice = Long.valueOf(ParameterTools.toString(request, "timeSliceValue"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("the request parameter timeSliceValue must numeric with long type");
}
ServerNodeSearchPersistence.RequestEntity requestEntity;
requestEntity = new ServerNodeSearchPersistence.RequestEntity(ParameterTools.toString(request, "timeSliceType"), timeSlice);
getSelfContext().lookup(ServerNodeSearchPersistence.WorkerRole.INSTANCE).ask(requestEntity, response);
}
public static class Factory extends AbstractSearcherProvider<ServerNodeWithTimeSliceSearcher> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServerNodeWithTimeSliceSearcher workerInstance(ClusterWorkerContext clusterContext) {
return new ServerNodeWithTimeSliceSearcher(role(), clusterContext, new LocalWorkerContext());
}
@Override
public String servletPath() {
return "/dagNode/search/serverNodeWithTimeSlice";
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServerNodeWithTimeSliceSearcher.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonObject;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
/**
* @author pengys5
*/
public abstract class AbstractHttpServlet extends HttpServlet {
final public void reply(HttpServletResponse response, JsonObject resJson, int status) throws IOException {
response.setContentType("text/json");
response.setCharacterEncoding("utf-8");
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter out = response.getWriter();
out.print(resJson);
out.flush();
out.close();
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.google.gson.JsonObject;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* @author pengys5
*/
public abstract class AbstractReceiver extends AbstractLocalAsyncWorker {
public AbstractReceiver(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request) throws Exception {
if (request instanceof ReceiverMessage) {
ReceiverMessage receiverMessage = (ReceiverMessage) request;
onReceive(receiverMessage.request);
}
}
protected abstract void onReceive(JsonObject request) throws Exception;
static class ReceiveWithHttpServlet extends AbstractHttpServlet {
private final LocalAsyncWorkerRef ownerWorkerRef;
public ReceiveWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
JsonObject reqJson = new JsonObject();
JsonObject resJson = new JsonObject();
try {
request.getParameter("json");
ownerWorkerRef.tell(new ReceiverMessage(reqJson));
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}
}
static class ReceiverMessage {
private final JsonObject request;
public ReceiverMessage(JsonObject request) {
this.request = request;
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
/**
* @author pengys5
*/
public abstract class AbstractReceiverProvider<T extends AbstractLocalAsyncWorker> extends AbstractLocalAsyncWorkerProvider<T> {
public abstract String servletPath();
final protected void createReceiver(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalAsyncWorkerRef workerRef = (LocalAsyncWorkerRef) super.create(AbstractWorker.noOwner());
AbstractReceiver.ReceiveWithHttpServlet receiveWithHttpServlet = new AbstractReceiver.ReceiveWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(receiveWithHttpServlet), servletPath());
}
@Override
final public T workerInstance(ClusterWorkerContext clusterContext) {
return null;
}
@Override
final public Role role() {
return null;
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.google.gson.JsonObject;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class AbstractSearcher extends AbstractLocalSyncWorker {
public AbstractSearcher(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request, Object response) throws Exception {
Map<String, String[]> parameterMap = (Map<String, String[]>) request;
onSearch(parameterMap, (JsonObject) response);
}
protected abstract void onSearch(Map<String, String[]> request, JsonObject response) throws Exception;
static class SearchWithHttpServlet extends AbstractHttpServlet {
private final LocalSyncWorkerRef ownerWorkerRef;
SearchWithHttpServlet(LocalSyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
Map<String, String[]> parameterMap = request.getParameterMap();
JsonObject resJson = new JsonObject();
try {
ownerWorkerRef.ask(parameterMap, resJson);
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
/**
* @author pengys5
*/
public abstract class AbstractSearcherProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalSyncWorkerProvider<T> {
public abstract String servletPath();
final protected void createSearcher(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) super.create(AbstractWorker.noOwner());
AbstractSearcher.SearchWithHttpServlet searchWithHttpServlet = new AbstractSearcher.SearchWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(searchWithHttpServlet), servletPath());
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.Role;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class Controller {
protected abstract NanoHTTPD.Method httpMethod();
protected abstract String path();
protected abstract JsonElement execute(Map<String, String> parms);
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.httpserver;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author pengys5
*/
public enum ControllerCenter {
INSTANCE;
private Map<String, Controller> getControllers = new ConcurrentHashMap();
private Map<String, Controller> postControllers = new ConcurrentHashMap();
protected void register(NanoHTTPD.Method method, String path, Controller controller) throws DuplicateControllerException {
if (NanoHTTPD.Method.GET.equals(method)) {
if (getControllers.containsKey(path)) {
throw new DuplicateControllerException("method: " + method + "with path: " + path + " duplicate each other");
} else {
getControllers.put(path, controller);
}
} else if (NanoHTTPD.Method.POST.equals(method)) {
if (postControllers.containsKey(path)) {
throw new DuplicateControllerException("method: " + method + "with path: " + path + " duplicate each other");
} else {
postControllers.put(path, controller);
}
}
}
protected Controller find(NanoHTTPD.Method method, String path) {
if (NanoHTTPD.Method.GET.equals(method)) {
return getControllers.get(path);
} else if (NanoHTTPD.Method.POST.equals(method)) {
return postControllers.get(path);
} else {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ControllerCreator {
INSTANCE;
public void boot() throws Exception {
ServiceLoader<ControllerProvider> controllerLoader = java.util.ServiceLoader.load(ControllerProvider.class);
for (ControllerProvider provider : controllerLoader) {
Controller controller = provider.create();
ControllerCenter.INSTANCE.register(controller.httpMethod(), controller.path(), controller);
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
public class ControllerNotFoundException extends Exception {
public ControllerNotFoundException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
/**
* @author pengys5
*/
public abstract class ControllerProvider {
public abstract Class clazz();
public Controller create() throws Exception {
Controller controller = (Controller) clazz().newInstance();
return controller;
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
public class DuplicateControllerException extends Exception {
public DuplicateControllerException(String message){
super(message);
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
/**
* @author pengys5
*/
public enum HttpServer {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(HttpServer.class);
public void boot(ClusterWorkerContext clusterContext) throws Exception {
Server server = new Server(7001);
String contextPath = "/skywalking";
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
logger.info("http server root context path: %s", contextPath);
ServletsCreator.INSTANCE.boot(servletContextHandler, clusterContext);
// ServerConnector serverConnector = new ServerConnector(server);
// serverConnector.setHost("127.0.0.1");
// serverConnector.setPort(7001);
// serverConnector.setIdleTimeout(5000);
server.setHandler(servletContextHandler);
server.start();
server.join();
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public enum RequestDispatcher {
INSTANCE;
public JsonElement dispatch(NanoHTTPD.Method method, String uri, Map<String, String> parms) throws ControllerNotFoundException {
Controller controller = ControllerCenter.INSTANCE.find(method, uri);
if (controller != null) {
return controller.execute(parms);
} else {
throw new ControllerNotFoundException("Could not found controller for [method: " + method.name() + ", uri: " + uri + "]");
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.servlet.ServletContextHandler;
import java.util.ServiceLoader;
/**
* @author pengys5
*/
public enum ServletsCreator {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(ServletsCreator.class);
public void boot(ServletContextHandler servletContextHandler, ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
ServiceLoader<AbstractReceiverProvider> receiverLoader = java.util.ServiceLoader.load(AbstractReceiverProvider.class);
for (AbstractReceiverProvider provider : receiverLoader) {
provider.setClusterContext(clusterContext);
provider.createReceiver(servletContextHandler);
logger.info("add receiver servlet mapping path: %s ", provider.servletPath());
}
ServiceLoader<AbstractSearcherProvider> searcherLoader = java.util.ServiceLoader.load(AbstractSearcherProvider.class);
for (AbstractSearcherProvider provider : searcherLoader) {
provider.setClusterContext(clusterContext);
provider.createSearcher(servletContextHandler);
logger.info("add searcher servlet mapping path: %s ", provider.servletPath());
}
}
}
package com.a.eye.skywalking.collector.worker.httpserver;
import com.google.gson.JsonElement;
import fi.iki.elonen.NanoHTTPD;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Map;
/**
* @author pengys5
*/
public enum WebServer {
INSTANCE;
private Logger logger = LogManager.getFormatterLogger(WebServer.class);
public void boot() throws Exception {
NanoHttpServer server = new NanoHttpServer(7001);
ControllerCreator.INSTANCE.boot();
}
public class NanoHttpServer extends NanoHTTPD {
public NanoHttpServer(int port) throws IOException {
super(port);
start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
logger.info("Running! Point your browsers to http://localhost:%d/", port);
}
@Override
public Response serve(IHTTPSession session) {
Method method = session.getMethod();
String uri = session.getUri();
Map<String, String> parms = session.getParms();
logger.debug("request method: %s, uri: %s, parms: %s", method.toString(), uri, parms);
try {
JsonElement response = RequestDispatcher.INSTANCE.dispatch(method, uri, parms);
return newFixedLengthResponse(Response.Status.OK, "text/json", response.toString());
} catch (ControllerNotFoundException e) {
String errorMessage = e.getMessage();
return newFixedLengthResponse(Response.Status.NOT_FOUND, "text/html", errorMessage);
}
}
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.httpserver.controller;
import com.a.eye.skywalking.collector.worker.httpserver.Controller;
import com.a.eye.skywalking.collector.worker.httpserver.ControllerProvider;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public class DagController extends Controller {
@Override
public NanoHTTPD.Method httpMethod() {
return NanoHTTPD.Method.GET;
}
@Override
public String path() {
return "/getNodes";
}
@Override
public JsonElement execute(Map<String, String> parms) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("test", "aaaa");
return jsonObject;
}
public static class Factory extends ControllerProvider {
@Override
public Class clazz() {
return DagController.class;
}
}
}
package com.a.eye.skywalking.collector.worker.receiver;
/**
* @author pengys5
*/
public class TraceSegmentKeyMapping {
public static class TraceSegment {
public static String traceSegmentId = "ts";
}
}
package com.a.eye.skywalking.collector.worker.receiver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.commons.role.TraceSegmentReceiverRole;
import com.a.eye.skywalking.collector.worker.WorkerConfig;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.application.ApplicationMain;
import com.a.eye.skywalking.collector.worker.applicationref.ApplicationRefMain;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractReceiver;
import com.a.eye.skywalking.collector.worker.httpserver.AbstractReceiverProvider;
import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
import com.a.eye.skywalking.collector.worker.tools.DateTools;
import com.a.eye.skywalking.trace.TraceSegment;
import com.google.gson.JsonObject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class TraceSegmentReceiver extends AbstractClusterWorker {
public class TraceSegmentReceiver extends AbstractReceiver {
private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class);
......@@ -29,35 +33,27 @@ public class TraceSegmentReceiver extends AbstractClusterWorker {
}
@Override
public void work(Object message) throws Exception {
if (message instanceof TraceSegment) {
TraceSegment traceSegment = (TraceSegment) message;
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId());
long timeSlice = DateTools.timeStampToTimeSlice(traceSegment.getStartTime());
int second = DateTools.timeStampToSecond(traceSegment.getStartTime());
TraceSegmentTimeSlice segmentTimeSlice = new TraceSegmentTimeSlice(timeSlice, second, traceSegment);
getSelfContext().lookup(ApplicationMain.Role.INSTANCE).tell(segmentTimeSlice);
getSelfContext().lookup(ApplicationRefMain.Role.INSTANCE).tell(segmentTimeSlice);
}
protected void onReceive(JsonObject request) throws Exception {
logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", request.get("ts"));
// long timeSlice = DateTools.timeStampToTimeSlice(traceSegment.getStartTime());
// int second = DateTools.timeStampToSecond(traceSegment.getStartTime());
//
// TraceSegmentTimeSlice segmentTimeSlice = new TraceSegmentTimeSlice(timeSlice, second, traceSegment);
// getSelfContext().lookup(ApplicationMain.Role.INSTANCE).tell(segmentTimeSlice);
// getSelfContext().lookup(ApplicationRefMain.Role.INSTANCE).tell(segmentTimeSlice);
}
public static class Factory extends AbstractClusterWorkerProvider<TraceSegmentReceiver> {
public static class Factory extends AbstractReceiverProvider<TraceSegmentReceiver> {
public static Factory INSTANCE = new Factory();
@Override
public int workerNum() {
return WorkerConfig.Worker.TraceSegmentReceiver.Num;
}
@Override
public Role role() {
return TraceSegmentReceiverRole.INSTANCE;
public String servletPath() {
return "/receiver/traceSegment";
}
@Override
public TraceSegmentReceiver workerInstance(ClusterWorkerContext clusterContext) {
return new TraceSegmentReceiver(role(), clusterContext, new LocalWorkerContext());
public int queueSize() {
return 128;
}
}
......
......@@ -2,6 +2,8 @@ package com.a.eye.skywalking.collector.worker.storage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
......@@ -31,4 +33,14 @@ public class EsClient {
public static Client getClient() {
return client;
}
public static void indexRefresh(String... indexName) {
RefreshResponse response = client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
if (response.getShardFailures().length == response.getTotalShards()) {
logger.error("All elasticsearch shard index refresh failure, reason: %s", response.getShardFailures());
} else if (response.getShardFailures().length > 0) {
logger.error("In parts of elasticsearch shard index refresh failure, reason: %s", response.getShardFailures());
}
logger.info("elasticsearch index refresh success");
}
}
......@@ -20,18 +20,22 @@ public abstract class AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(AbstractIndex.class);
public static final String Type_Minute = "minute";
public static final String Type_Hour = "hour";
public static final String Type_Day = "day";
final public XContentBuilder createSettingBuilder() throws IOException {
XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
.startObject()
.field("index.number_of_shards", 2)
.field("index.number_of_replicas", 0)
.field("index.number_of_shards", 2)
.field("index.number_of_replicas", 0)
.endObject();
return settingsBuilder;
}
public abstract XContentBuilder createMappingBuilder() throws IOException;
final public boolean createIndex() {
final public void createIndex() {
// settings
String settingSource = "";
......@@ -45,13 +49,15 @@ public abstract class AbstractIndex {
mappingBuilder = createMappingBuilder();
logger.info("mapping builder str: %s", mappingBuilder.string());
} catch (Exception e) {
logger.error("create %s index type of %s mapping builder error", index(), type());
logger.error("create %s index mapping builder error", index());
}
Settings settings = Settings.builder().loadFromSource(settingSource).build();
IndicesAdminClient client = EsClient.getClient().admin().indices();
CreateIndexResponse response = client.prepareCreate(index()).setSettings(settings).addMapping(type(), mappingBuilder).get();
logger.info("create %s index with type of %s finished, isAcknowledged: %s", index(), type(), response.isAcknowledged());
return response.isAcknowledged();
CreateIndexResponse response = client.prepareCreate(index()).setSettings(settings).addMapping(Type_Minute, mappingBuilder).get();
client.preparePutMapping(index()).setType(Type_Hour).setSource(mappingBuilder).get();
client.preparePutMapping(index()).setType(Type_Day).setSource(mappingBuilder).get();
logger.info("create %s index with type of %s finished, isAcknowledged: %s", index(), "aaa", response.isAcknowledged());
}
final public boolean deleteIndex() {
......@@ -67,6 +73,4 @@ public abstract class AbstractIndex {
}
public abstract String index();
public abstract String type();
}
package com.a.eye.skywalking.collector.worker.storage.index;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
/**
* @author pengys5
*/
public class ClientNodeIndex extends AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(ClientNodeIndex.class);
public static final String Index = "client_node_idx";
@Override
public String index() {
return Index;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("layer")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("component")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("serverHost")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("timeSlice")
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -10,23 +10,17 @@ import java.io.IOException;
/**
* @author pengys5
*/
public class ApplicationIndexWithNodeInstType extends AbstractIndex {
public class NodeInstanceIndex extends AbstractIndex {
private static Logger logger = LogManager.getFormatterLogger(ApplicationIndexWithNodeInstType.class);
private static Logger logger = LogManager.getFormatterLogger(NodeInstanceIndex.class);
public static final String Index = "application";
public static final String Type = "node_instance";
public static final String Index = "node_instance_idx";
@Override
public String index() {
return Index;
}
@Override
public String type() {
return Type;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
......@@ -34,7 +28,6 @@ public class ApplicationIndexWithNodeInstType extends AbstractIndex {
.startObject("properties")
.startObject("code")
.field("type", "string")
.field("fielddata", true)
.field("index", "not_analyzed")
.endObject()
.startObject("address")
......@@ -42,7 +35,7 @@ public class ApplicationIndexWithNodeInstType extends AbstractIndex {
.field("index", "not_analyzed")
.endObject()
.startObject("timeSlice")
.field("type", "long_range")
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
......
package com.a.eye.skywalking.collector.worker.storage.index;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
/**
* @author pengys5
*/
public class NodeRefIndex extends AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(NodeRefIndex.class);
public static final String Index = "node_ref_idx";
@Override
public String index() {
return Index;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("client")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("server")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("timeSlice")
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -10,23 +10,18 @@ import java.io.IOException;
/**
* @author pengys5
*/
public class ApplicationIndexWithDagNodeType extends AbstractIndex {
public class ServerNodeIndex extends AbstractIndex {
private Logger logger = LogManager.getFormatterLogger(ApplicationIndexWithDagNodeType.class);
private Logger logger = LogManager.getFormatterLogger(ServerNodeIndex.class);
public static final String Index = "server_node_idx";
public static final String Index = "application";
public static final String Type = "dag_node";
@Override
public String index() {
return Index;
}
@Override
public String type() {
return Type;
}
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
......@@ -36,11 +31,11 @@ public class ApplicationIndexWithDagNodeType extends AbstractIndex {
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("layer")
.startObject("layer")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("component")
.startObject("component")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
......
package com.a.eye.skywalking.collector.worker.tools;
import java.util.Map;
/**
* @author pengys5
*/
public class ParameterTools {
public static String toString(Map<String, String[]> request, String key) {
if (request.get(key) != null) {
return request.get(key)[0];
} else {
return "";
}
}
}
package com.a.eye.skywalking.collector.worker.web.controller.tracedag;
import com.a.eye.skywalking.collector.worker.httpserver.Controller;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import fi.iki.elonen.NanoHTTPD;
import java.util.Map;
/**
* @author pengys5
*/
public class TraceDagLoadController extends Controller {
@Override
protected NanoHTTPD.Method httpMethod() {
return NanoHTTPD.Method.GET;
}
@Override
protected String path() {
return "/traceDagLoad";
}
@Override
public JsonElement execute(Map<String, String> parms) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("test", "aaaa");
return jsonObject;
}
}
package com.a.eye.skywalking.collector.worker.web.controller.tracedag;
/**
* @author pengys5
*/
public class TraceDagUpdateController {
}
package com.a.eye.skywalking.collector.worker.web.persistence;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ApplicationIndexWithDagNodeType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.builder.api.FilterComponentBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class ApplicationPersistence {
private Logger logger = LogManager.getFormatterLogger(ApplicationPersistence.class);
public void searchDagNode(long startTimeSlice, long endTimeSlice){
SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch(ApplicationIndexWithDagNodeType.Index);
searchRequestBuilder.setTypes(ApplicationIndexWithDagNodeType.Type);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
// searchRequestBuilder.setQuery(QueryBuilders.rangeQuery("timeSlice").gte(startTimeSlice).lte(endTimeSlice));
ConstantScoreQueryBuilder constantScoreQueryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.rangeQuery("timeSlice").gte(startTimeSlice).lte(endTimeSlice));
searchRequestBuilder.setQuery(constantScoreQueryBuilder);
//// GlobalAggregationBuilder aggregationBuilder = AggregationBuilders.global("agg").subAggregation(AggregationBuilders.terms("distinct_code").field("code"));
// TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("distinct_code").field("code");
searchRequestBuilder.addAggregation(AggregationBuilders.terms("distinct_code").field("code"));
SearchResponse response = searchRequestBuilder.execute().actionGet();
List<Aggregation> aggregationList = response.getAggregations().asList();
logger.debug("dag node list size: %s", aggregationList.size());
for (Aggregation aggregation : aggregationList) {
for (Map.Entry<String, Object> entry : aggregation.getMetaData().entrySet()) {
logger.debug("code: %s, count: %s", entry.getKey(), entry.getValue());
}
}
}
}
package com.a.eye.skywalking.collector.worker.web.persistence;
/**
* @author pengys5
*/
public class NodeInstancePersistence {
}
com.a.eye.skywalking.collector.worker.dagnode.persistence.ClientNodeSearchPersistence$Factory
com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeInstanceSearchPersistence$Factory
com.a.eye.skywalking.collector.worker.dagnode.persistence.NodeRefSearchPersistence$Factory
com.a.eye.skywalking.collector.worker.dagnode.persistence.ServerNodeSearchPersistence$Factory
\ No newline at end of file
com.a.eye.skywalking.collector.worker.dagnode.searcher.ClientNodeWithTimeSliceSearcher$Factory
com.a.eye.skywalking.collector.worker.dagnode.searcher.NodeInstanceWithTimeSliceSearcher$Factory
com.a.eye.skywalking.collector.worker.dagnode.searcher.NodeRefWithTimeSliceSearcher$Factory
com.a.eye.skywalking.collector.worker.dagnode.searcher.ServerNodeWithTimeSliceSearcher$Factory
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ClientNodeIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class ClientNodeTestCase {
@Before
public void initIndex() throws UnknownHostException {
EsClient.boot();
ClientNodeIndex index = new ClientNodeIndex();
index.deleteIndex();
index.createIndex();
}
@Test
public void testLoadClientNode() throws Exception {
loadNode(201703101201l, ClientNodeIndex.Type_Minute);
loadNode(201703101200l, ClientNodeIndex.Type_Hour);
loadNode(201703100000l, ClientNodeIndex.Type_Day);
}
public void loadNode(long timeSlice, String type) throws Exception {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) ClientNodeSearchPersistence.Factory.INSTANCE.create(AbstractWorker.noOwner());
insertData(timeSlice, type);
EsClient.indexRefresh(ClientNodeIndex.Index);
ClientNodeSearchPersistence.RequestEntity requestEntity = new ClientNodeSearchPersistence.RequestEntity(type, timeSlice);
JsonObject resJsonObj = new JsonObject();
workerRef.ask(requestEntity, resJsonObj);
JsonArray nodeArray = resJsonObj.get("result").getAsJsonArray();
for (int i = 0; i < nodeArray.size(); i++) {
JsonObject node = nodeArray.get(i).getAsJsonObject();
System.out.println(node);
}
}
private void insertData(long timeSlice, String type) {
Map<String, Object> json = new HashMap<String, Object>();
json.put("serverHost", "10.20.3.15:3000");
json.put("timeSlice", timeSlice);
json.put("component", "Motan");
json.put("layer", "rpc");
String _id = timeSlice + "-Motan-10.20.3.15:3000";
IndexResponse response = EsClient.getClient().prepareIndex(ClientNodeIndex.Index, type, _id).setSource(json).get();
RestStatus status = response.status();
status.getStatus();
json = new HashMap<String, Object>();
json.put("serverHost", "10.5.34.18");
json.put("timeSlice", timeSlice);
json.put("component", "Mysql");
json.put("layer", "db");
_id = timeSlice + "-Mysql-10.5.34.18";
response = EsClient.getClient().prepareIndex(ClientNodeIndex.Index, type, _id).setSource(json).get();
status = response.status();
status.getStatus();
}
}
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeInstanceIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeInstanceTestCase {
@Before
public void initIndex() throws UnknownHostException {
EsClient.boot();
NodeInstanceIndex index = new NodeInstanceIndex();
index.deleteIndex();
index.createIndex();
}
@Test
public void testLoadNodeInstance() throws Exception {
loadNodeInstance(201703101201l, NodeInstanceIndex.Type_Minute);
loadNodeInstance(201703101200l, NodeInstanceIndex.Type_Hour);
loadNodeInstance(201703100000l, NodeInstanceIndex.Type_Day);
}
public void loadNodeInstance(long timeSlice, String type) throws Exception {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) NodeInstanceSearchPersistence.Factory.INSTANCE.create(AbstractWorker.noOwner());
insertData(timeSlice, type);
EsClient.indexRefresh(NodeInstanceIndex.Index);
NodeInstanceSearchPersistence.RequestEntity requestEntity = new NodeInstanceSearchPersistence.RequestEntity(type, timeSlice);
JsonObject resJsonObj = new JsonObject();
workerRef.ask(requestEntity, resJsonObj);
JsonArray nodeArray = resJsonObj.get("result").getAsJsonArray();
for (int i = 0; i < nodeArray.size(); i++) {
JsonObject node = nodeArray.get(i).getAsJsonObject();
System.out.println(node);
}
}
private void insertData(long timeSlice, String type) {
Map<String, Object> json = new HashMap<String, Object>();
json.put("code", "WebApplication");
json.put("address", "10.218.9.86:8080");
json.put("timeSlice", timeSlice);
String _id = timeSlice + "-WebApplication-" + "10.218.9.86:8080";
IndexResponse response = EsClient.getClient().prepareIndex(NodeInstanceIndex.Index, type, _id).setSource(json).get();
RestStatus status = response.status();
status.getStatus();
json = new HashMap<String, Object>();
json.put("code", "MotanServiceApplication");
json.put("address", "10.20.3.15:3000");
json.put("timeSlice", timeSlice);
_id = timeSlice + "-MotanServiceApplication-" + "10.20.3.15:3000";
response = EsClient.getClient().prepareIndex(NodeInstanceIndex.Index, type, _id).setSource(json).get();
status = response.status();
status.getStatus();
}
}
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.NodeRefIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeRefTestCase {
@Before
public void initIndex() throws UnknownHostException {
EsClient.boot();
NodeRefIndex index = new NodeRefIndex();
index.deleteIndex();
index.createIndex();
}
@Test
public void testLoadNodeRef() throws Exception {
loadNodeRef(201703101201l, NodeRefIndex.Type_Minute);
loadNodeRef(201703101200l, NodeRefIndex.Type_Hour);
loadNodeRef(201703100000l, NodeRefIndex.Type_Day);
}
public void loadNodeRef(long timeSlice, String type) throws Exception {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) NodeRefSearchPersistence.Factory.INSTANCE.create(AbstractWorker.noOwner());
insertData(timeSlice, type);
EsClient.indexRefresh(NodeRefIndex.Index);
NodeRefSearchPersistence.RequestEntity requestEntity = new NodeRefSearchPersistence.RequestEntity(type, timeSlice);
JsonObject resJsonObj = new JsonObject();
workerRef.ask(requestEntity, resJsonObj);
JsonArray nodeArray = resJsonObj.get("result").getAsJsonArray();
for (int i = 0; i < nodeArray.size(); i++) {
JsonObject node = nodeArray.get(i).getAsJsonObject();
System.out.println(node);
}
}
private void insertData(long timeSlice, String type) {
Map<String, Object> json = new HashMap<String, Object>();
json.put("client", "WebApplication");
json.put("server", "MotanServiceApplication");
json.put("timeSlice", timeSlice);
String _id = timeSlice + "-WebApplication-MotanServiceApplication";
IndexResponse response = EsClient.getClient().prepareIndex(NodeRefIndex.Index, type, _id).setSource(json).get();
RestStatus status = response.status();
status.getStatus();
}
}
package com.a.eye.skywalking.collector.worker.dagnode.persistence;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.LocalSyncWorkerRef;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ServerNodeIndex;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class ServerNodeTestCase {
@Before
public void initIndex() throws UnknownHostException {
EsClient.boot();
ServerNodeIndex index = new ServerNodeIndex();
index.deleteIndex();
index.createIndex();
}
@Test
public void testLoadServerNode() throws Exception {
loadNode(201703101201l, ServerNodeIndex.Type_Minute);
loadNode(201703101200l, ServerNodeIndex.Type_Hour);
loadNode(201703100000l, ServerNodeIndex.Type_Day);
}
public void loadNode(long timeSlice, String type) throws Exception {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) ServerNodeSearchPersistence.Factory.INSTANCE.create(AbstractWorker.noOwner());
insertData(timeSlice, type);
EsClient.indexRefresh(ServerNodeIndex.Index);
ServerNodeSearchPersistence.RequestEntity requestEntity = new ServerNodeSearchPersistence.RequestEntity(type, timeSlice);
JsonObject resJsonObj = new JsonObject();
workerRef.ask(requestEntity, resJsonObj);
JsonArray nodeArray = resJsonObj.get("result").getAsJsonArray();
for (int i = 0; i < nodeArray.size(); i++) {
JsonObject node = nodeArray.get(i).getAsJsonObject();
System.out.println(node);
}
}
private void insertData(long timeSlice, String type) {
Map<String, Object> json = new HashMap<String, Object>();
json.put("code", "WebApplication");
json.put("timeSlice", timeSlice);
json.put("component", "Tomcat");
json.put("layer", "http");
String _id = timeSlice + "-WebApplication";
IndexResponse response = EsClient.getClient().prepareIndex(ServerNodeIndex.Index, type, _id).setSource(json).get();
RestStatus status = response.status();
status.getStatus();
json = new HashMap<String, Object>();
json.put("code", "MotanServiceApplication");
json.put("timeSlice", timeSlice);
json.put("component", "Motan");
json.put("layer", "rpc");
_id = timeSlice + "-MotanServiceApplication";
response = EsClient.getClient().prepareIndex(ServerNodeIndex.Index, type, _id).setSource(json).get();
status = response.status();
status.getStatus();
}
}
package com.a.eye.skywalking.collector.worker.web.persistence;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.index.ApplicationIndexWithDagNodeType;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.rest.RestStatus;
import org.junit.Before;
import org.junit.Test;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class ApplicationPersistenceTestCase {
@Before
public void initIndex() throws UnknownHostException {
EsClient.boot();
ApplicationIndexWithDagNodeType index = new ApplicationIndexWithDagNodeType();
index.deleteIndex();
index.createIndex();
insertData();
}
@Test
public void testSearchDagNode() {
ApplicationPersistence persistence = new ApplicationPersistence();
persistence.searchDagNode(201703101200l, 201703101259l);
}
private void insertData() {
long minute = 201703101200l;
for (int i = 0; i < 60; i++) {
Map<String, Object> json = new HashMap<String, Object>();
json.put("code", "DubboServer_MySQL");
json.put("timeSlice", minute);
json.put("component", "Dubbo");
json.put("layer", "http");
String _id = minute + "-DubboServer_MySQL";
IndexResponse response = EsClient.getClient().prepareIndex(ApplicationIndexWithDagNodeType.Index, ApplicationIndexWithDagNodeType.Type, _id).setSource(json).get();
RestStatus status = response.status();
status.getStatus();
minute++;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册