提交 4101ca02 编写于 作者: B baiyang

Merge remote-tracking branch 'upstream/master'

......@@ -631,8 +631,8 @@ to attach them to the start of each source file to most effectively
state the exclusion of warranty; and each file should have at least
the "copyright" line and a pointer to where the full notice is found.
{one line to give the program's name and a brief idea of what it does.}
Copyright (C) {year} {name of author}
sky-walking APM
Copyright (C) 2015-2017 Wu Sheng
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -652,7 +652,7 @@ Also add information on how to contact you by electronic and paper mail.
If the program does terminal interaction, make it output a short
notice like this when it starts in an interactive mode:
{project} Copyright (C) {year} {fullname}
sky-walking Copyright (C) 2015-2017 Wu Sheng
This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
This is free software, and you are welcome to redistribute it
under certain conditions; type `show c' for details.
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
<module name="Checker">
<property name="localeLanguage" value="en"/>
<!--To configure the check to report on the first instance in each file-->
<module name="FileTabCharacter"/>
<module name="RegexpSingleline">
<property name="format" value="System\.out\.println"/>
<property name="message" value="Prohibit invoking System.out.println in source code !"/>
</module>
<module name="RegexpSingleline">
<property name="format"
value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
<property name="message" value="Not allow chinese character !"/>
</module>
<module name="FileLength">
<property name="max" value="3000"/>
</module>
<module name="TreeWalker">
<module name="UnusedImports">
<property name="processJavadoc" value="true"/>
</module>
<module name="RedundantImport"/>
<!--Checks that classes that override equals() also override hashCode()-->
<module name="EqualsHashCode"/>
<!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
<module name="SimplifyBooleanExpression"/>
<module name="OneStatementPerLine"/>
<module name="UnnecessaryParentheses"/>
<!--Checks for over-complicated boolean return statements. For example the following code-->
<module name="SimplifyBooleanReturn"/>
<!--Check that the default is after all the cases in producerGroup switch statement-->
<module name="DefaultComesLast"/>
<!--Detects empty statements (standalone ";" semicolon)-->
<module name="EmptyStatement"/>
<!--Checks that long constants are defined with an upper ell-->
<module name="UpperEll"/>
<module name="ConstantName">
<property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^logger)"/>
</module>
<!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
<module name="LocalVariableName"/>
<!--Validates identifiers for local, final variables, including catch parameters-->
<module name="LocalFinalVariableName"/>
<!--Validates identifiers for non-static fields-->
<module name="MemberName"/>
<!--Validates identifiers for class type parameters-->
<module name="ClassTypeParameterName">
<property name="format" value="^[A-Z0-9]*$"/>
</module>
<!--Validates identifiers for method type parameters-->
<module name="MethodTypeParameterName">
<property name="format" value="^[A-Z0-9]*$"/>
</module>
<module name="PackageName"/>
<module name="ParameterName"/>
<module name="StaticVariableName">
<property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
</module>
<module name="TypeName">
<property name="format" value="(^[A-Z][a-zA-Z0-9]*$)|(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)"/>
</module>
<!--whitespace-->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
<module name="NoWhitespaceAfter"/>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
</module>
<module name="Indentation"/>
<module name="MethodParamPad"/>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
<module name="TypecastParenPad"/>
</module>
</module>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<code_scheme name="sky-walking">
<option name="USE_SAME_INDENTS" value="true"/>
<option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
<option name="OTHER_INDENT_OPTIONS">
<value>
<option name="INDENT_SIZE" value="4"/>
<option name="CONTINUATION_INDENT_SIZE" value="4"/>
<option name="TAB_SIZE" value="4"/>
<option name="USE_TAB_CHARACTER" value="false"/>
<option name="SMART_TABS" value="false"/>
<option name="LABEL_INDENT_SIZE" value="0"/>
<option name="LABEL_INDENT_ABSOLUTE" value="false"/>
<option name="USE_RELATIVE_INDENTS" value="false"/>
</value>
</option>
<option name="PREFER_LONGER_NAMES" value="false"/>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
<option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
<value/>
</option>
<option name="IMPORT_LAYOUT_TABLE">
<value>
<package name="" withSubpackages="true" static="false"/>
<emptyLine/>
<package name="" withSubpackages="true" static="true"/>
</value>
</option>
<option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
<option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
<option name="JD_P_AT_EMPTY_LINES" value="false"/>
<option name="JD_KEEP_INVALID_TAGS" value="false"/>
<option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
<option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
<option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
<option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
<option name="WHILE_ON_NEW_LINE" value="true"/>
<option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
<option name="ALIGN_MULTILINE_FOR" value="false"/>
<option name="SPACE_AFTER_TYPE_CAST" value="false"/>
<option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
<option name="METHOD_PARAMETERS_WRAP" value="1"/>
<option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
<option name="LABELED_STATEMENT_WRAP" value="1"/>
<option name="WRAP_COMMENTS" value="true"/>
<option name="METHOD_ANNOTATION_WRAP" value="1"/>
<option name="CLASS_ANNOTATION_WRAP" value="1"/>
<option name="FIELD_ANNOTATION_WRAP" value="1"/>
<JavaCodeStyleSettings>
<option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
</JavaCodeStyleSettings>
<XML>
<option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
</XML>
<ADDITIONAL_INDENT_OPTIONS fileType="haml">
<option name="INDENT_SIZE" value="2"/>
</ADDITIONAL_INDENT_OPTIONS>
<codeStyleSettings language="Groovy">
<option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
<option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
<option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
<option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
<option name="ALIGN_MULTILINE_FOR" value="false"/>
<option name="SPACE_AFTER_TYPE_CAST" value="false"/>
<option name="METHOD_PARAMETERS_WRAP" value="1"/>
<option name="METHOD_ANNOTATION_WRAP" value="1"/>
<option name="CLASS_ANNOTATION_WRAP" value="1"/>
<option name="FIELD_ANNOTATION_WRAP" value="1"/>
<option name="PARENT_SETTINGS_INSTALLED" value="true"/>
<indentOptions>
<option name="CONTINUATION_INDENT_SIZE" value="4"/>
</indentOptions>
</codeStyleSettings>
<codeStyleSettings language="HOCON">
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
<option name="PARENT_SETTINGS_INSTALLED" value="true"/>
</codeStyleSettings>
<codeStyleSettings language="JAVA">
<option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
<option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
<option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
<option name="WHILE_ON_NEW_LINE" value="true"/>
<option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
<option name="ALIGN_MULTILINE_FOR" value="false"/>
<option name="SPACE_AFTER_TYPE_CAST" value="false"/>
<option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
<option name="METHOD_PARAMETERS_WRAP" value="1"/>
<option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
<option name="LABELED_STATEMENT_WRAP" value="1"/>
<option name="METHOD_ANNOTATION_WRAP" value="1"/>
<option name="CLASS_ANNOTATION_WRAP" value="1"/>
<option name="FIELD_ANNOTATION_WRAP" value="1"/>
<option name="PARENT_SETTINGS_INSTALLED" value="true"/>
<indentOptions>
<option name="CONTINUATION_INDENT_SIZE" value="4"/>
</indentOptions>
</codeStyleSettings>
<codeStyleSettings language="JSON">
<option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
<option name="PARENT_SETTINGS_INSTALLED" value="true"/>
</codeStyleSettings>
<codeStyleSettings language="Scala">
<option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
<option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
<option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
<option name="WHILE_ON_NEW_LINE" value="true"/>
<option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
<option name="ALIGN_MULTILINE_FOR" value="false"/>
<option name="METHOD_PARAMETERS_WRAP" value="1"/>
<option name="METHOD_ANNOTATION_WRAP" value="1"/>
<option name="CLASS_ANNOTATION_WRAP" value="1"/>
<option name="FIELD_ANNOTATION_WRAP" value="1"/>
<option name="PARENT_SETTINGS_INSTALLED" value="true"/>
<indentOptions>
<option name="INDENT_SIZE" value="4"/>
<option name="CONTINUATION_INDENT_SIZE" value="4"/>
<option name="TAB_SIZE" value="4"/>
</indentOptions>
</codeStyleSettings>
<codeStyleSettings language="XML">
<indentOptions>
<option name="CONTINUATION_INDENT_SIZE" value="4"/>
</indentOptions>
</codeStyleSettings>
</code_scheme>
......@@ -190,6 +190,25 @@
<skipDocker>true</skipDocker>
</configuration>
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<configLocation>checkStyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -45,7 +45,8 @@
<repository>
<id>bintray-wu-sheng-sky-walking-repository</id>
<name>wu-sheng-sky-walking-repository</name>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-log4j-1.x/;publish=1</url>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-log4j-1.x/;publish=1
</url>
</repository>
</distributionManagement>
</project>
package com.a.eye.skywalking.toolkit.log.log4j.v1.x;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.helpers.PatternParser;
import org.apache.log4j.spi.LoggingEvent;
/**
* The log4j extend pattern.
* By using this pattern, if sky-walking agent is also active, {@link PatternParser#finalizeConverter(char)} method will be override dynamic.
* <p>
* Created by wusheng on 2016/12/7.
* The log4j extend pattern. By using this pattern, if sky-walking agent is also active, {@link
* PatternParser#finalizeConverter(char)} method will be override dynamic. <p>
*
* @author wusheng
*/
public class TraceIdPatternLayout extends PatternLayout {
@Override
......
......@@ -45,7 +45,8 @@
<repository>
<id>bintray-wu-sheng-sky-walking-repository</id>
<name>wu-sheng-sky-walking-repository</name>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-log4j-2.x/;publish=1</url>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-log4j-2.x/;publish=1
</url>
</repository>
</distributionManagement>
</project>
package com.a.eye.skywalking.toolkit.log.log4j.v2.x;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.pattern.ConverterKeys;
......@@ -21,7 +20,7 @@ public class TraceIdConverter extends LogEventPatternConverter {
/**
* Constructs an instance of LoggingEventPatternConverter.
*
* @param name name of converter.
* @param name name of converter.
* @param style CSS style for output.
*/
protected TraceIdConverter(String name, String style) {
......
......@@ -45,7 +45,8 @@
<repository>
<id>bintray-wu-sheng-sky-walking-repository</id>
<name>wu-sheng-sky-walking-repository</name>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-logback-1.x/;publish=1</url>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-logback-1.x/;publish=1
</url>
</repository>
</distributionManagement>
</project>
......@@ -28,7 +28,8 @@
<repository>
<id>bintray-wu-sheng-sky-walking-repository</id>
<name>wu-sheng-sky-walking-repository</name>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-opentracing/;publish=1</url>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-opentracing/;publish=1
</url>
</repository>
</distributionManagement>
</project>
......@@ -18,7 +18,7 @@ public class SkyWalkingSpan implements Span, SpanContext {
private final Map<String, String> baggageItems;
SkyWalkingSpan(String operationName, long startTime, Map<String, String> tags){
SkyWalkingSpan(String operationName, long startTime, Map<String, String> tags) {
this.operationName = operationName;
this.startTime = startTime;
this.tags = tags;
......
......@@ -25,7 +25,7 @@ public class SkyWalkingSpanBuilder implements Tracer.SpanBuilder {
private SpanContext parentContext;
SkyWalkingSpanBuilder(String operationName){
SkyWalkingSpanBuilder(String operationName) {
this.operationName = operationName;
this.tags = new HashMap<String, String>();
}
......@@ -44,6 +44,7 @@ public class SkyWalkingSpanBuilder implements Tracer.SpanBuilder {
/**
* In SkyWalkingTracer, Parent Span will not be used. Tracer will build reference by itself.
*
* @param span
* @return
*/
......@@ -94,7 +95,7 @@ public class SkyWalkingSpanBuilder implements Tracer.SpanBuilder {
@Override
public Span start() {
if (startTime == 0){
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
return new SkyWalkingSpan(this.operationName, this.startTime, this.tags);
......@@ -103,7 +104,7 @@ public class SkyWalkingSpanBuilder implements Tracer.SpanBuilder {
@Override
public Iterable<Map.Entry<String, String>> baggageItems() {
return parentContext == null
? Collections.<String, String>emptyMap().entrySet()
: parentContext.baggageItems();
? Collections.<String, String>emptyMap().entrySet()
: parentContext.baggageItems();
}
}
......@@ -27,15 +27,15 @@ public class SkyWalkingTracer implements Tracer {
@Override
public <C> void inject(SpanContext spanContext, Format<C> format, C carrier) {
if (Format.Builtin.TEXT_MAP.equals(format) || Format.Builtin.HTTP_HEADERS.equals(format)) {
((TextMap) carrier).put(TRACE_HEAD_NAME, formatCrossProcessPropagationContextData());
((TextMap)carrier).put(TRACE_HEAD_NAME, formatCrossProcessPropagationContextData());
} else if (Format.Builtin.BINARY.equals(format)) {
byte[] key = TRACE_HEAD_NAME.getBytes(ByteBufferContext.CHARSET);
byte[] value = formatCrossProcessPropagationContextData().getBytes(ByteBufferContext.CHARSET);
((ByteBuffer) carrier).put(ByteBufferContext.ENTRY);
((ByteBuffer) carrier).putInt(key.length);
((ByteBuffer) carrier).putInt(value.length);
((ByteBuffer) carrier).put(key);
((ByteBuffer) carrier).put(value);
((ByteBuffer)carrier).put(ByteBufferContext.ENTRY);
((ByteBuffer)carrier).putInt(key.length);
((ByteBuffer)carrier).putInt(value.length);
((ByteBuffer)carrier).put(key);
((ByteBuffer)carrier).put(value);
} else {
throw new IllegalArgumentException("Unsupported format: " + format);
}
......@@ -44,7 +44,7 @@ public class SkyWalkingTracer implements Tracer {
@Override
public <C> SpanContext extract(Format<C> format, C carrier) {
if (Format.Builtin.TEXT_MAP.equals(format) || Format.Builtin.HTTP_HEADERS.equals(format)) {
TextMap textMapCarrier = (TextMap) carrier;
TextMap textMapCarrier = (TextMap)carrier;
extractCrossProcessPropagationContextData(textMapCarrier);
return new TextMapContext(textMapCarrier);
} else if (Format.Builtin.BINARY.equals(format)) {
......@@ -56,7 +56,6 @@ public class SkyWalkingTracer implements Tracer {
}
}
/**
* set context data in toolkit-opentracing-activation
*
......
......@@ -9,7 +9,7 @@ import java.util.Map;
/**
* Created by wusheng on 2016/12/21.
*/
public class TextMapContext implements SpanContext {
public class TextMapContext implements SpanContext {
private final TextMap textMap;
TextMapContext(TextMap textMap) {
......
......@@ -14,7 +14,7 @@ import java.util.Map;
*/
public class SkyWalkingTracerTest {
@Test
public void testBuildSpan(){
public void testBuildSpan() {
Tracer tracer = SkyWalkingTracer.INSTANCE;
Tracer.SpanBuilder spanBuilder = tracer.buildSpan("/http/serviceName");
......@@ -23,7 +23,7 @@ public class SkyWalkingTracerTest {
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"TextMapInjectAdapter should only be used with Tracer.inject()");
"TextMapInjectAdapter should only be used with Tracer.inject()");
}
@Override
......
......@@ -37,7 +37,9 @@
<repository>
<id>bintray-wu-sheng-sky-walking-repository</id>
<name>wu-sheng-sky-walking-repository</name>
<url>https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-trace-context/;publish=1</url>
<url>
https://api.bintray.com/maven/wu-sheng/skywalking/com.a.eye.skywalking-toolkit-trace-context/;publish=1
</url>
</repository>
</distributionManagement>
</project>
......@@ -17,28 +17,28 @@ public enum AkkaSystem {
private Logger logger = LogManager.getFormatterLogger(AkkaSystem.class);
public ActorSystem create() {
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + ClusterConfig.Cluster.Current.hostname).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + ClusterConfig.Cluster.Current.port)).
withFallback(ConfigFactory.load("application.conf"));
if (!StringUtil.isEmpty(ClusterConfig.Cluster.seed_nodes)) {
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.HOSTNAME=" + ClusterConfig.Cluster.Current.HOSTNAME).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.PORT=" + ClusterConfig.Cluster.Current.PORT)).
withFallback(ConfigFactory.load("application.conf"));
if (!StringUtil.isEmpty(ClusterConfig.Cluster.SEED_NODES)) {
config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=" + generateSeedNodes()));
}
return ActorSystem.create(Const.SystemName, config);
return ActorSystem.create(Const.SYSTEM_NAME, config);
}
private String generateSeedNodes() {
String[] seedNodes = ClusterConfig.Cluster.seed_nodes.split(",");
String[] seedNodes = ClusterConfig.Cluster.SEED_NODES.split(",");
String akkaSeedNodes = "";
for (int i = 0; i < seedNodes.length; i++) {
String akkaNodeName = "\"akka.tcp://" + Const.SystemName + "@" + seedNodes[i] + "\"";
String akkaNodeName = "\"akka.tcp://" + Const.SYSTEM_NAME + "@" + seedNodes[i] + "\"";
if (i > 0) {
akkaSeedNodes += ",";
}
akkaSeedNodes += akkaNodeName;
}
akkaSeedNodes = "[" + akkaSeedNodes + "]";
logger.info("config seedNodes: %s, generate seedNodes: %s", ClusterConfig.Cluster.seed_nodes, akkaSeedNodes);
logger.info("config seedNodes: %s, generate seedNodes: %s", ClusterConfig.Cluster.SEED_NODES, akkaSeedNodes);
return akkaSeedNodes;
}
}
......@@ -2,20 +2,25 @@ package com.a.eye.skywalking.collector;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.AbstractClusterWorkerProvider;
import com.a.eye.skywalking.collector.actor.AbstractLocalWorkerProvider;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LookUp;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.UsedRoleNameException;
import com.a.eye.skywalking.collector.cluster.WorkersListener;
import com.a.eye.skywalking.collector.config.ConfigInitializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ServiceLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
*/
public class CollectorSystem {
private Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);
private static final Logger logger = LogManager.getFormatterLogger(CollectorSystem.class);
private ClusterWorkerContext clusterContext;
......@@ -37,7 +42,7 @@ public class CollectorSystem {
}
private void createListener() {
clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WorkName);
clusterContext.getAkkaSystem().actorOf(Props.create(WorkersListener.class, clusterContext), WorkersListener.WORK_NAME);
}
private void createClusterWorkers() throws ProviderNotFoundException {
......
......@@ -24,10 +24,10 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
/**
* Construct an <code>AbstractClusterWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance.
* Meaning, each worker have multi instances.
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
......@@ -80,14 +80,14 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof ClusterEvent.CurrentClusterState) {
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState) message;
ClusterEvent.CurrentClusterState state = (ClusterEvent.CurrentClusterState)message;
for (Member member : state.getMembers()) {
if (member.status().equals(MemberStatus.up())) {
register(member);
}
}
} else if (message instanceof ClusterEvent.MemberUp) {
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp) message;
ClusterEvent.MemberUp memberUp = (ClusterEvent.MemberUp)message;
logger.info("receive ClusterEvent.MemberUp message, address: %s", memberUp.member().address().toString());
register(memberUp.member());
} else {
......@@ -97,16 +97,16 @@ public abstract class AbstractClusterWorker extends AbstractWorker {
}
/**
* When member role is {@link WorkersListener#WorkName} then Select actor from context
* When member role is {@link WorkersListener#WORK_NAME} then Select actor from context
* and send register message to {@link WorkersListener}
*
* @param member is the new created or restart worker
*/
void register(Member member) {
if (member.hasRole(WorkersListener.WorkName)) {
if (member.hasRole(WorkersListener.WORK_NAME)) {
WorkerListenerMessage.RegisterMessage registerMessage = new WorkerListenerMessage.RegisterMessage(ownerWorker.getRole());
logger.info("member address: %s, worker path: %s", member.address().toString(), getSelf().path().toString());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WorkName).tell(registerMessage, getSelf());
getContext().actorSelection(member.address() + "/user/" + WorkersListener.WORK_NAME).tell(registerMessage, getSelf());
}
}
}
......
......@@ -25,12 +25,12 @@ public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWor
*
* @param localContext Not used, will be null.
* @return The created worker reference. See {@link ClusterWorkerRef}
* @throws IllegalArgumentException Not used.
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another worker
* instance, when the worker provider not find then Throw this Exception.
* @throws IllegalArgumentException Not used.
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override
final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
@Override final public WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());
T clusterWorker = workerInstance(getClusterContext());
......
......@@ -17,10 +17,10 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
/**
* Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
*
* @param role The responsibility of worker in cluster, more than one workers can have
* same responsibility which use to provide load balancing ability.
* @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use
* to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
......@@ -68,8 +68,8 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
* Receive the message from disruptor, when message in disruptor is empty, then send the cached data
* to the next workers.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
......
......@@ -13,9 +13,9 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
public abstract int queueSize();
@Override
final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
T localAsyncWorker = (T) workerInstance(getClusterContext());
@Override final public WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
T localAsyncWorker = (T)workerInstance(getClusterContext());
localAsyncWorker.preStart();
// Specify the size of the ring buffer, must be power of 2.
......
......@@ -5,9 +5,9 @@ package com.a.eye.skywalking.collector.actor;
*/
public abstract class AbstractLocalSyncWorkerProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalWorkerProvider<T> {
@Override
final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
T localSyncWorker = (T) workerInstance(getClusterContext());
@Override final public WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
T localSyncWorker = (T)workerInstance(getClusterContext());
localSyncWorker.preStart();
LocalSyncWorkerRef workerRef = new LocalSyncWorkerRef(role(), localSyncWorker);
......
......@@ -11,7 +11,8 @@ public abstract class AbstractWorkerProvider<T extends AbstractWorker> implement
public abstract T workerInstance(ClusterWorkerContext clusterContext);
public abstract WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException;
public abstract WorkerRef onCreate(
LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException;
final public void setClusterContext(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
......@@ -21,7 +22,8 @@ public abstract class AbstractWorkerProvider<T extends AbstractWorker> implement
return clusterContext;
}
final public WorkerRef create(AbstractWorker workerOwner) throws IllegalArgumentException, ProviderNotFoundException {
final public WorkerRef create(
AbstractWorker workerOwner) throws IllegalArgumentException, ProviderNotFoundException {
if (workerInstance(clusterContext) == null) {
throw new IllegalArgumentException("cannot get worker instance with nothing obtained from workerInstance()");
}
......@@ -29,7 +31,7 @@ public abstract class AbstractWorkerProvider<T extends AbstractWorker> implement
if (workerOwner == null) {
return onCreate(null);
} else if (workerOwner.getSelfContext() instanceof LocalWorkerContext) {
return onCreate((LocalWorkerContext) workerOwner.getSelfContext());
return onCreate((LocalWorkerContext)workerOwner.getSelfContext());
} else {
throw new IllegalArgumentException("the argument of workerOwner is Illegal");
}
......
package com.a.eye.skywalking.collector.actor;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
......
package com.a.eye.skywalking.collector.actor;
public class ProviderNotFoundException extends Exception {
public ProviderNotFoundException(String message){
public ProviderNotFoundException(String message) {
super(message);
}
}
package com.a.eye.skywalking.collector.actor;
public class UsedRoleNameException extends Exception {
public UsedRoleNameException(String message){
public UsedRoleNameException(String message) {
super(message);
}
}
......@@ -20,8 +20,7 @@ public abstract class WorkerContext implements Context {
return this.roleWorkers;
}
@Override
final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
@Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
if (getRoleWorkers().containsKey(role.roleName())) {
WorkerRefs refs = new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
return refs;
......@@ -30,16 +29,14 @@ public abstract class WorkerContext implements Context {
}
}
@Override
final public void put(WorkerRef workerRef) {
@Override final public void put(WorkerRef workerRef) {
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<WorkerRef>());
}
getRoleWorkers().get(workerRef.getRole().roleName()).add(workerRef);
}
@Override
final public void remove(WorkerRef workerRef) {
@Override final public void remove(WorkerRef workerRef) {
getRoleWorkers().remove(workerRef.getRole().roleName());
}
}
package com.a.eye.skywalking.collector.actor;
public class WorkerNotFoundException extends Exception {
public WorkerNotFoundException(String message){
public WorkerNotFoundException(String message) {
super(message);
}
}
......@@ -29,7 +29,7 @@ public class WorkerRefs<T extends WorkerRef> {
public void ask(Object request, Object response) throws Exception {
WorkerRef workerRef = workerSelector.select(workerRefs, request);
if (workerRef instanceof LocalSyncWorkerRef) {
((LocalSyncWorkerRef) workerRef).ask(request, response);
((LocalSyncWorkerRef)workerRef).ask(request, response);
} else {
throw new IllegalAccessError("only local sync worker can ask");
}
......
......@@ -6,9 +6,9 @@ import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} by message {@link AbstractHashMessage} key's hashcode, so it can use to send the same hashcode
* message to same {@link WorkerRef}. Usually, use to database operate which avoid dirty data.
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef}
* by message {@link AbstractHashMessage} key's hashcode, so it can use to send the same hashcode message to same {@link
* WorkerRef}. Usually, use to database operate which avoid dirty data.
*
* @author pengys5
* @since v3.0-2017
......@@ -25,7 +25,7 @@ public class HashCodeSelector implements WorkerSelector<WorkerRef> {
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
if (message instanceof AbstractHashMessage) {
AbstractHashMessage hashMessage = (AbstractHashMessage) message;
AbstractHashMessage hashMessage = (AbstractHashMessage)message;
int size = members.size();
int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return members.get(selectIndex);
......
......@@ -2,11 +2,11 @@ package com.a.eye.skywalking.collector.cluster;
/**
* A static class contains some config values of cluster.
* {@link Cluster.Current#hostname} is a ip address of server which start this process.
* {@link Cluster.Current#port} is a port of server use to bind
* {@link Cluster.Current#roles} is a roles of workers that use to create workers which
* {@link Cluster.Current#HOSTNAME} is a ip address of server which start this process.
* {@link Cluster.Current#PORT} is a PORT of server use to bind
* {@link Cluster.Current#ROLES} is a ROLES of workers that use to create workers which
* has those role in this process.
* {@link Cluster#seed_nodes} is a seed_nodes which cluster have, List of strings, e.g. seed_nodes = "ip:port,ip:port"..
* {@link Cluster#SEED_NODES} is a SEED_NODES which cluster have, List of strings, e.g. SEED_NODES = "ip:PORT,ip:PORT"..
*
* @author pengys5
*/
......@@ -14,11 +14,11 @@ public class ClusterConfig {
public static class Cluster {
public static class Current {
public static String hostname = "";
public static String port = "";
public static String roles = "";
public static String HOSTNAME = "";
public static String PORT = "";
public static String ROLES = "";
}
public static String seed_nodes = "";
public static String SEED_NODES = "";
}
}
......@@ -15,17 +15,17 @@ public class ClusterConfigProvider implements ConfigProvider {
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("cluster.current.hostname"))) {
ClusterConfig.Cluster.Current.hostname = System.getProperty("cluster.current.hostname");
if (!StringUtil.isEmpty(System.getProperty("cluster.current.HOSTNAME"))) {
ClusterConfig.Cluster.Current.HOSTNAME = System.getProperty("cluster.current.HOSTNAME");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.port"))) {
ClusterConfig.Cluster.Current.port = System.getProperty("cluster.current.port");
if (!StringUtil.isEmpty(System.getProperty("cluster.current.PORT"))) {
ClusterConfig.Cluster.Current.PORT = System.getProperty("cluster.current.PORT");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.current.roles"))) {
ClusterConfig.Cluster.Current.roles = System.getProperty("cluster.current.roles");
if (!StringUtil.isEmpty(System.getProperty("cluster.current.ROLES"))) {
ClusterConfig.Cluster.Current.ROLES = System.getProperty("cluster.current.ROLES");
}
if (!StringUtil.isEmpty(System.getProperty("cluster.seed_nodes"))) {
ClusterConfig.Cluster.seed_nodes = System.getProperty("cluster.seed_nodes");
if (!StringUtil.isEmpty(System.getProperty("cluster.SEED_NODES"))) {
ClusterConfig.Cluster.SEED_NODES = System.getProperty("cluster.SEED_NODES");
}
}
}
......@@ -4,5 +4,5 @@ package com.a.eye.skywalking.collector.cluster;
* @author pengys5
*/
public class Const {
public static final String SystemName = "ClusterSystem";
public static final String SYSTEM_NAME = "ClusterSystem";
}
package com.a.eye.skywalking.collector.cluster;
/**
* The <code>NoAvailableWorkerException</code> represents no available member,
* when the {@link WorkersRefCenter#availableWorks(String)} try to get the list.
*
* Most likely, in the cluster, these is no active worker of the particular role.
*
* @author wusheng
*/
public class NoAvailableWorkerException extends Exception {
public NoAvailableWorkerException(String message){
super(message);
}
}
......@@ -27,17 +27,13 @@ import java.util.concurrent.ConcurrentHashMap;
* @author pengys5
*/
public class WorkersListener extends UntypedActor {
public static final String WORK_NAME = "WorkersListener";
private Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
public static final String WorkName = "WorkersListener";
private static final Logger logger = LogManager.getFormatterLogger(WorkersListener.class);
private final ClusterWorkerContext clusterContext;
private Cluster cluster = Cluster.get(getContext().system());
private Map<ActorRef, ClusterWorkerRef> relation = new ConcurrentHashMap<>();
private final ClusterWorkerContext clusterContext;
public WorkersListener(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
......@@ -50,18 +46,18 @@ public class WorkersListener extends UntypedActor {
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof WorkerListenerMessage.RegisterMessage) {
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage) message;
WorkerListenerMessage.RegisterMessage register = (WorkerListenerMessage.RegisterMessage)message;
ActorRef sender = getSender();
logger.info("register worker of role: %s, path: %s", register.getRole().roleName(), sender.toString());
ClusterWorkerRef workerRef = new ClusterWorkerRef(sender, register.getRole());
relation.put(sender, workerRef);
clusterContext.put(new ClusterWorkerRef(sender, register.getRole()));
} else if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
Terminated terminated = (Terminated)message;
clusterContext.remove(relation.get(terminated.getActor()));
relation.remove(terminated.getActor());
} else if (message instanceof ClusterEvent.UnreachableMember) {
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember) message;
ClusterEvent.UnreachableMember unreachableMember = (ClusterEvent.UnreachableMember)message;
Iterator<Map.Entry<ActorRef, ClusterWorkerRef>> iterator = relation.entrySet().iterator();
while (iterator.hasNext()) {
......
......@@ -12,34 +12,34 @@ public class ConfigInitializerTestCase {
@Before
public void clear() {
System.clearProperty("cluster.current.hostname");
System.clearProperty("cluster.current.port");
System.clearProperty("cluster.current.roles");
System.clearProperty("cluster.seed_nodes");
System.clearProperty("cluster.current.HOSTNAME");
System.clearProperty("cluster.current.PORT");
System.clearProperty("cluster.current.ROLES");
System.clearProperty("cluster.SEED_NODES");
}
@Test
public void testInitialize() throws Exception {
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.1", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("WorkersListener", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("127.0.0.1:1000", ClusterConfig.Cluster.seed_nodes);
Assert.assertEquals("127.0.0.1", ClusterConfig.Cluster.Current.HOSTNAME);
Assert.assertEquals("1000", ClusterConfig.Cluster.Current.PORT);
Assert.assertEquals("WorkersListener", ClusterConfig.Cluster.Current.ROLES);
Assert.assertEquals("127.0.0.1:1000", ClusterConfig.Cluster.SEED_NODES);
}
@Test
public void testInitializeWithCli() throws Exception {
System.setProperty("cluster.current.hostname", "127.0.0.2");
System.setProperty("cluster.current.port", "1001");
System.setProperty("cluster.current.roles", "Test1, Test2");
System.setProperty("cluster.seed_nodes", "127.0.0.1:1000, 127.0.0.1:1001");
System.setProperty("cluster.current.HOSTNAME", "127.0.0.2");
System.setProperty("cluster.current.PORT", "1001");
System.setProperty("cluster.current.ROLES", "Test1, Test2");
System.setProperty("cluster.SEED_NODES", "127.0.0.1:1000, 127.0.0.1:1001");
ConfigInitializer.INSTANCE.initialize();
Assert.assertEquals("127.0.0.2", ClusterConfig.Cluster.Current.hostname);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.port);
Assert.assertEquals("Test1, Test2", ClusterConfig.Cluster.Current.roles);
Assert.assertEquals("127.0.0.1:1000, 127.0.0.1:1001", ClusterConfig.Cluster.seed_nodes);
Assert.assertEquals("127.0.0.2", ClusterConfig.Cluster.Current.HOSTNAME);
Assert.assertEquals("1001", ClusterConfig.Cluster.Current.PORT);
Assert.assertEquals("Test1, Test2", ClusterConfig.Cluster.Current.ROLES);
Assert.assertEquals("127.0.0.1:1000, 127.0.0.1:1001", ClusterConfig.Cluster.SEED_NODES);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n" />
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console" />
</Root>
</Loggers>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
......@@ -20,7 +20,7 @@ public class JsonSerializer extends JSerializer {
@Override
public byte[] toBinary(Object o) {
JsonObject jsonObject = (JsonObject) o;
JsonObject jsonObject = (JsonObject)o;
return jsonObject.toString().getBytes();
}
......
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id></id>
<formats>
<format>tar.gz</format>
......@@ -39,4 +39,4 @@
</includes>
</fileSet>
</fileSets>
</assembly>
\ No newline at end of file
</assembly>
......@@ -19,8 +19,7 @@ public abstract class AnalysisMember extends AbstractLocalAsyncWorker {
super.preStart();
}
@Override
final public void onWork(Object message) throws Exception {
@Override final public void onWork(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
aggregation();
} else {
......
......@@ -12,8 +12,7 @@ import org.apache.logging.log4j.Logger;
* @author pengys5
*/
public class CollectorBootStartUp {
private static Logger logger = LogManager.getFormatterLogger(CollectorBootStartUp.class);
private static final Logger logger = LogManager.getFormatterLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws Exception {
logger.info("collector system starting....");
......@@ -21,6 +20,6 @@ public class CollectorBootStartUp {
collectorSystem.boot();
EsClient.INSTANCE.boot();
IndexCreator.INSTANCE.create();
HttpServer.INSTANCE.boot((ClusterWorkerContext) collectorSystem.getClusterContext());
HttpServer.INSTANCE.boot((ClusterWorkerContext)collectorSystem.getClusterContext());
}
}
......@@ -25,7 +25,7 @@ public abstract class MergeAnalysisMember extends AnalysisMember {
final protected void setMergeData(String id, String column, String value) throws Exception {
getPersistenceData().getElseCreate(id).setMergeData(column, value);
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Analysis.size) {
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Analysis.SIZE) {
aggregation();
}
}
......
......@@ -38,12 +38,11 @@ public abstract class MergePersistenceMember extends PersistenceMember {
return persistenceData;
}
@Override
final public void analyse(Object message) throws Exception {
@Override final public void analyse(Object message) throws Exception {
if (message instanceof MergeData) {
MergeData mergeData = (MergeData) message;
MergeData mergeData = (MergeData)message;
getPersistenceData().getElseCreate(mergeData.getId()).merge(mergeData);
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Persistence.size) {
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence();
}
} else {
......@@ -82,7 +81,7 @@ public abstract class MergePersistenceMember extends PersistenceMember {
private boolean saveToEs() {
Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", getPersistenceData().size());
logger.debug("persistenceData SIZE: %s", getPersistenceData().size());
Iterator<Map.Entry<String, MergeData>> iterator = getPersistenceData().iterator();
while (iterator.hasNext()) {
......
......@@ -20,7 +20,7 @@ public abstract class MetricAnalysisMember extends AnalysisMember {
final protected void setMetric(String id, String column, Long value) throws Exception {
persistenceData.getElseCreate(id).setMetric(column, value);
if (persistenceData.size() >= CacheSizeConfig.Cache.Persistence.size) {
if (persistenceData.size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
aggregation();
}
}
......
......@@ -33,12 +33,11 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
super(role, clusterContext, selfContext);
}
@Override
final public void analyse(Object message) throws Exception {
@Override final public void analyse(Object message) throws Exception {
if (message instanceof MetricData) {
MetricData metricData = (MetricData) message;
MetricData metricData = (MetricData)message;
persistenceData.getElseCreate(metricData.getId()).merge(metricData);
if (persistenceData.size() >= CacheSizeConfig.Cache.Persistence.size) {
if (persistenceData.size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence();
}
} else {
......@@ -78,7 +77,7 @@ public abstract class MetricPersistenceMember extends PersistenceMember {
private boolean saveToEs() {
Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", persistenceData.size());
logger.debug("persistenceData SIZE: %s", persistenceData.size());
Iterator<Map.Entry<String, MetricData>> iterator = persistenceData.iterator();
while (iterator.hasNext()) {
......
......@@ -22,13 +22,11 @@ public abstract class PersistenceMember extends AbstractLocalAsyncWorker {
public abstract void analyse(Object message) throws Exception;
@Override
final public void preStart() throws ProviderNotFoundException {
@Override final public void preStart() throws ProviderNotFoundException {
}
@Override
final protected void onWork(Object message) throws Exception {
@Override final protected void onWork(Object message) throws Exception {
if (message instanceof EndOfBatchCommand) {
persistence();
} else {
......
......@@ -21,7 +21,7 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
final public void setRecord(String id, JsonObject record) throws Exception {
persistenceData.getElseCreate(id).setRecord(record);
if (persistenceData.size() >= CacheSizeConfig.Cache.Analysis.size) {
if (persistenceData.size() >= CacheSizeConfig.Cache.Analysis.SIZE) {
aggregation();
}
}
......
......@@ -37,10 +37,10 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof RecordData) {
RecordData recordData = (RecordData) message;
RecordData recordData = (RecordData)message;
logger.debug("setRecord: id: %s, data: %s", recordData.getId(), recordData.getRecord());
getPersistenceData().getElseCreate(recordData.getId()).setRecord(recordData.getRecord());
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Persistence.size) {
if (getPersistenceData().size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence();
}
} else {
......@@ -58,7 +58,7 @@ public abstract class RecordPersistenceMember extends PersistenceMember {
private boolean saveToEs() {
Client client = EsClient.INSTANCE.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
logger.debug("persistenceData size: %s", getPersistenceData().size());
logger.debug("persistenceData SIZE: %s", getPersistenceData().size());
Iterator<Map.Entry<String, RecordData>> iterator = getPersistenceData().iterator();
......
......@@ -8,7 +8,7 @@ public abstract class TimeSlice {
private long startTime;
private long endTime;
public TimeSlice(String sliceType,long startTime, long endTime) {
public TimeSlice(String sliceType, long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
this.sliceType = sliceType;
......
......@@ -7,11 +7,11 @@ public class CacheSizeConfig {
public static class Cache {
public static class Analysis {
public static int size = 1000;
public static int SIZE = 1000;
}
public static class Persistence {
public static int size = 1000;
public static int SIZE = 1000;
}
}
}
......@@ -7,26 +7,26 @@ public class EsConfig {
public static class Es {
public static class Cluster {
public static String name = "";
public static String nodes = "";
public static String NAME = "";
public static String NODES = "";
public static class Transport {
public static String sniffer = "";
public static String SNIFFER = "";
}
}
public static class Index {
public static class Initialize {
public static IndexInitMode mode;
public static IndexInitMode MODE;
}
public static class Shards {
public static String number = "";
public static String NUMBER = "";
}
public static class Replicas {
public static String number = "";
public static String NUMBER = "";
}
}
}
......
......@@ -15,21 +15,21 @@ public class EsConfigProvider implements ConfigProvider {
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("es.cluster.name"))) {
EsConfig.Es.Cluster.name = System.getProperty("es.cluster.name");
if (!StringUtil.isEmpty(System.getProperty("es.cluster.NAME"))) {
EsConfig.Es.Cluster.NAME = System.getProperty("es.cluster.NAME");
}
if (!StringUtil.isEmpty(System.getProperty("es.cluster.nodes"))) {
EsConfig.Es.Cluster.nodes = System.getProperty("es.cluster.nodes");
if (!StringUtil.isEmpty(System.getProperty("es.cluster.NODES"))) {
EsConfig.Es.Cluster.NODES = System.getProperty("es.cluster.NODES");
}
if (!StringUtil.isEmpty(System.getProperty("es.cluster.transport.sniffer"))) {
EsConfig.Es.Cluster.Transport.sniffer = System.getProperty("es.cluster.transport.sniffer");
if (!StringUtil.isEmpty(System.getProperty("es.cluster.transport.SNIFFER"))) {
EsConfig.Es.Cluster.Transport.SNIFFER = System.getProperty("es.cluster.transport.SNIFFER");
}
if (!StringUtil.isEmpty(System.getProperty("es.index.shards.number"))) {
EsConfig.Es.Index.Shards.number = System.getProperty("es.index.shards.number");
if (!StringUtil.isEmpty(System.getProperty("es.index.shards.NUMBER"))) {
EsConfig.Es.Index.Shards.NUMBER = System.getProperty("es.index.shards.NUMBER");
}
if (!StringUtil.isEmpty(System.getProperty("es.index.replicas.number"))) {
EsConfig.Es.Index.Replicas.number = System.getProperty("es.index.replicas.number");
if (!StringUtil.isEmpty(System.getProperty("es.index.replicas.NUMBER"))) {
EsConfig.Es.Index.Replicas.NUMBER = System.getProperty("es.index.replicas.NUMBER");
}
}
}
......@@ -6,8 +6,8 @@ package com.a.eye.skywalking.collector.worker.config;
public class HttpConfig {
public static class Http {
public static String hostname = "";
public static String port = "";
public static String contextPath = "";
public static String HOSTNAME = "";
public static String PORT = "";
public static String CONTEXTPATH = "";
}
}
......@@ -15,14 +15,14 @@ public class HttpConfigProvider implements ConfigProvider {
@Override
public void cliArgs() {
if (!StringUtil.isEmpty(System.getProperty("http.hostname"))) {
HttpConfig.Http.hostname = System.getProperty("http.hostname");
if (!StringUtil.isEmpty(System.getProperty("http.HOSTNAME"))) {
HttpConfig.Http.HOSTNAME = System.getProperty("http.HOSTNAME");
}
if (!StringUtil.isEmpty(System.getProperty("http.port"))) {
HttpConfig.Http.port = System.getProperty("http.port");
if (!StringUtil.isEmpty(System.getProperty("http.PORT"))) {
HttpConfig.Http.PORT = System.getProperty("http.PORT");
}
if (!StringUtil.isEmpty(System.getProperty("http.contextPath"))) {
HttpConfig.Http.contextPath = System.getProperty("http.contextPath");
if (!StringUtil.isEmpty(System.getProperty("http.CONTEXTPATH"))) {
HttpConfig.Http.CONTEXTPATH = System.getProperty("http.CONTEXTPATH");
}
}
}
......@@ -8,51 +8,51 @@ public class WorkerConfig {
public static class WorkerNum {
public static class Node {
public static class NodeCompAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeMappingDayAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeMappingHourAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeMappingMinuteAgg {
public static int Value = 10;
public static int VALUE = 10;
}
}
public static class NodeRef {
public static class NodeRefDayAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeRefHourAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeRefMinuteAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeRefResSumDayAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeRefResSumHourAgg {
public static int Value = 10;
public static int VALUE = 10;
}
public static class NodeRefResSumMinuteAgg {
public static int Value = 10;
public static int VALUE = 10;
}
}
public static class GlobalTrace {
public static class GlobalTraceAgg {
public static int Value = 10;
public static int VALUE = 10;
}
}
}
......@@ -60,113 +60,113 @@ public class WorkerConfig {
public static class Queue {
public static class GlobalTrace {
public static class GlobalTraceSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class GlobalTraceAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
}
public static class Segment {
public static class SegmentPost {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class SegmentCostSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class SegmentSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class SegmentExceptionSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
}
public static class Node {
public static class NodeCompAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeMappingDayAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeMappingHourAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeMappingMinuteAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeCompSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeMappingDaySave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeMappingHourSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeMappingMinuteSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
}
public static class NodeRef {
public static class NodeRefDayAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefHourAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefDaySave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefHourSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefMinuteSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefResSumDaySave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefResSumHourSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefResSumMinuteSave {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefResSumDayAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefResSumHourAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
public static class NodeRefResSumMinuteAnalysis {
public static int Size = 1024;
public static int SIZE = 1024;
}
}
}
......
......@@ -11,13 +11,12 @@ import java.io.IOException;
*/
public class GlobalTraceIndex extends AbstractIndex {
public static final String Index = "global_trace_idx";
public static final String SubSegIds = "subSegIds";
public static final String INDEX = "global_trace_idx";
public static final String SUB_SEG_IDS = "subSegIds";
@Override
public String index() {
return Index;
return INDEX;
}
@Override
......@@ -28,14 +27,14 @@ public class GlobalTraceIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(SubSegIds)
.field("type", "text")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
.startObject()
.startObject("properties")
.startObject(SUB_SEG_IDS)
.field("type", "text")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -29,14 +29,14 @@ public class GlobalTraceAnalysis extends MergeAnalysisMember {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
String subSegmentId = segment.getTraceSegmentId();
List<DistributedTraceId> globalTraceIdList = segment.getRelatedGlobalTraces();
if (CollectionTools.isNotEmpty(globalTraceIdList)) {
for (DistributedTraceId disTraceId : globalTraceIdList) {
String traceId = disTraceId.get();
setMergeData(traceId, GlobalTraceIndex.SubSegIds, subSegmentId);
setMergeData(traceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
}
}
}
......@@ -65,7 +65,7 @@ public class GlobalTraceAnalysis extends MergeAnalysisMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.GlobalTrace.GlobalTraceAnalysis.Size;
return WorkerConfig.Queue.GlobalTrace.GlobalTraceAnalysis.SIZE;
}
}
......
......@@ -15,7 +15,8 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(GlobalTraceAgg.class);
GlobalTraceAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
GlobalTraceAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +49,7 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.GlobalTrace.GlobalTraceAgg.Value;
return WorkerConfig.WorkerNum.GlobalTrace.GlobalTraceAgg.VALUE;
}
}
......
package com.a.eye.skywalking.collector.worker.globaltrace.persistence;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorkerProvider;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
......@@ -15,18 +14,19 @@ import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
*/
public class GlobalTraceSave extends MergePersistenceMember {
GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return GlobalTraceIndex.Index;
return GlobalTraceIndex.INDEX;
}
@Override
public String esType() {
return GlobalTraceIndex.Type_Record;
return GlobalTraceIndex.TYPE_RECORD;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTraceSave> {
......@@ -39,7 +39,7 @@ public class GlobalTraceSave extends MergePersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.GlobalTrace.GlobalTraceSave.Size;
return WorkerConfig.Queue.GlobalTrace.GlobalTraceSave.SIZE;
}
@Override
......
......@@ -39,18 +39,18 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
@Override
protected void onWork(Object request, Object response) throws Exception {
if (request instanceof String) {
String globalId = (String) request;
String globalTraceData = GetResponseFromEs.INSTANCE.get(GlobalTraceIndex.Index, GlobalTraceIndex.Type_Record, globalId).getSourceAsString();
String globalId = (String)request;
String globalTraceData = GetResponseFromEs.INSTANCE.get(GlobalTraceIndex.INDEX, GlobalTraceIndex.TYPE_RECORD, globalId).getSourceAsString();
JsonObject globalTraceObj = gson.fromJson(globalTraceData, JsonObject.class);
logger.debug("globalTraceObj: %s", globalTraceObj);
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SubSegIds).getAsString();
String[] subSegIds = subSegIdsStr.split(MergeData.Split);
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SUB_SEG_IDS).getAsString();
String[] subSegIds = subSegIdsStr.split(MergeData.SPLIT);
List<SpanView> spanViewList = new ArrayList<>();
for (String subSegId : subSegIds) {
logger.debug("subSegId: %s", subSegId);
String segmentSource = GetResponseFromEs.INSTANCE.get(SegmentIndex.Index, SegmentIndex.Type_Record, subSegId).getSourceAsString();
String segmentSource = GetResponseFromEs.INSTANCE.get(SegmentIndex.INDEX, SegmentIndex.TYPE_RECORD, subSegId).getSourceAsString();
logger.debug("segmentSource: %s", segmentSource);
Segment segment = SegmentDeserialize.INSTANCE.deserializeFromES(segmentSource);
String segmentId = segment.getTraceSegmentId();
......@@ -62,7 +62,7 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
}
}
JsonObject responseObj = (JsonObject) response;
JsonObject responseObj = (JsonObject)response;
responseObj.addProperty("result", buildTree(spanViewList));
}
}
......@@ -115,7 +115,8 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
return tempList;
}
private void spansDataBuild(Span span, String appCode, String segmentId, List<SpanView> spanViewList, List<TraceSegmentRef> refsList) {
private void spansDataBuild(Span span, String appCode, String segmentId, List<SpanView> spanViewList,
List<TraceSegmentRef> refsList) {
int spanId = span.getSpanId();
String spanSegId = segmentId + "--" + String.valueOf(spanId);
......
......@@ -18,15 +18,14 @@ public abstract class AbstractGet extends AbstractLocalSyncWorker {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request, Object response) throws Exception {
Map<String, String[]> parameterMap = (Map<String, String[]>) request;
@Override final public void onWork(Object request, Object response) throws Exception {
Map<String, String[]> parameterMap = (Map<String, String[]>)request;
try {
onSearch(parameterMap, (JsonObject) response);
onSearch(parameterMap, (JsonObject)response);
} catch (Exception e) {
e.printStackTrace();
((JsonObject) response).addProperty("isSuccess", false);
((JsonObject) response).addProperty("reason", e.getMessage());
((JsonObject)response).addProperty("isSuccess", false);
((JsonObject)response).addProperty("reason", e.getMessage());
}
}
......@@ -40,8 +39,8 @@ public abstract class AbstractGet extends AbstractLocalSyncWorker {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@Override final protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
Map<String, String[]> parameterMap = request.getParameterMap();
JsonObject resJson = new JsonObject();
......
......@@ -11,8 +11,9 @@ public abstract class AbstractGetProvider<T extends AbstractLocalSyncWorker> ext
public abstract String servletPath();
final protected void create(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef) super.create(AbstractWorker.noOwner());
final protected void create(
ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalSyncWorkerRef workerRef = (LocalSyncWorkerRef)super.create(AbstractWorker.noOwner());
AbstractGet.GetWithHttpServlet getWithHttpServlet = new AbstractGet.GetWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(getWithHttpServlet), servletPath());
}
......
......@@ -23,10 +23,9 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object request) throws Exception {
@Override final public void onWork(Object request) throws Exception {
if (request instanceof String) {
onReceive((String) request);
onReceive((String)request);
} else {
logger.error("unhandled request, request instance must String, but is %s", request.getClass().toString());
saveException(new IllegalArgumentException("request instance must String"));
......@@ -43,8 +42,8 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@Override final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
......
......@@ -11,8 +11,9 @@ public abstract class AbstractPostProvider<T extends AbstractLocalAsyncWorker> e
public abstract String servletPath();
final protected void create(ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalAsyncWorkerRef workerRef = (LocalAsyncWorkerRef) super.create(AbstractWorker.noOwner());
final protected void create(
ServletContextHandler context) throws IllegalArgumentException, ProviderNotFoundException {
LocalAsyncWorkerRef workerRef = (LocalAsyncWorkerRef)super.create(AbstractWorker.noOwner());
AbstractPost.PostWithHttpServlet postWithHttpServlet = new AbstractPost.PostWithHttpServlet(workerRef);
context.addServlet(new ServletHolder(postWithHttpServlet), servletPath());
}
......
......@@ -18,9 +18,9 @@ public enum HttpServer {
private Logger logger = LogManager.getFormatterLogger(HttpServer.class);
public void boot(ClusterWorkerContext clusterContext) throws Exception {
Server server = new Server(new InetSocketAddress(HttpConfig.Http.hostname, Integer.valueOf(HttpConfig.Http.port)));
Server server = new Server(new InetSocketAddress(HttpConfig.Http.HOSTNAME, Integer.valueOf(HttpConfig.Http.PORT)));
String contextPath = HttpConfig.Http.contextPath;
String contextPath = HttpConfig.Http.CONTEXTPATH;
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
logger.info("http server root context path: %s", contextPath);
......@@ -31,4 +31,4 @@ public enum HttpServer {
server.start();
server.join();
}
}
\ No newline at end of file
}
......@@ -16,7 +16,8 @@ public enum ServletsCreator {
private Logger logger = LogManager.getFormatterLogger(ServletsCreator.class);
public void boot(ServletContextHandler servletContextHandler, ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
public void boot(ServletContextHandler servletContextHandler,
ClusterWorkerContext clusterContext) throws IllegalArgumentException, ProviderNotFoundException {
ServiceLoader<AbstractPostProvider> receiverLoader = java.util.ServiceLoader.load(AbstractPostProvider.class);
for (AbstractPostProvider provider : receiverLoader) {
provider.setClusterContext(clusterContext);
......
......@@ -11,14 +11,13 @@ import java.io.IOException;
*/
public class NodeCompIndex extends AbstractIndex {
public static final String Index = "node_comp_idx";
public static final String Name = "name";
public static final String Peers = "peers";
public static final String INDEX = "node_comp_idx";
public static final String NAME = "name";
public static final String PEERS = "peers";
@Override
public String index() {
return Index;
return INDEX;
}
@Override
......@@ -29,22 +28,22 @@ public class NodeCompIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(Name)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Peers)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
.startObject()
.startObject("properties")
.startObject(NAME)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(PEERS)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -11,14 +11,13 @@ import java.io.IOException;
*/
public class NodeMappingIndex extends AbstractIndex {
public static final String Index = "node_mapping_idx";
public static final String Code = "code";
public static final String Peers = "peers";
public static final String INDEX = "node_mapping_idx";
public static final String CODE = "code";
public static final String PEERS = "peers";
@Override
public String index() {
return Index;
return INDEX;
}
@Override
......@@ -29,26 +28,26 @@ public class NodeMappingIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(Code)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Peers)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
.startObject()
.startObject("properties")
.startObject(CODE)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(PEERS)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(TIME_SLICE)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -23,7 +23,8 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeCompAnalysis.class);
AbstractNodeCompAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
AbstractNodeCompAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -32,23 +33,23 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
logger.debug("node analysis span isNotEmpty %s", CollectionTools.isNotEmpty(spanList));
if (CollectionTools.isNotEmpty(spanList)) {
logger.debug("node analysis span list size: %s", spanList.size());
logger.debug("node analysis span list SIZE: %s", spanList.size());
for (Span span : spanList) {
String kind = Tags.SPAN_KIND.get(span);
if (Tags.SPAN_KIND_CLIENT.equals(kind) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
String peers = SpanPeersTools.INSTANCE.getPeers(span);
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.Peers, peers);
compJsonObj.addProperty(NodeCompIndex.Name, Tags.COMPONENT.get(span));
compJsonObj.addProperty(NodeCompIndex.PEERS, peers);
compJsonObj.addProperty(NodeCompIndex.NAME, Tags.COMPONENT.get(span));
setRecord(peers, compJsonObj);
} else if (Tags.SPAN_KIND_SERVER.equals(kind) && span.getParentSpanId() == -1) {
String peers = segment.getApplicationCode();
JsonObject compJsonObj = new JsonObject();
compJsonObj.addProperty(NodeCompIndex.Peers, peers);
compJsonObj.addProperty(NodeCompIndex.Name, Tags.COMPONENT.get(span));
compJsonObj.addProperty(NodeCompIndex.PEERS, peers);
compJsonObj.addProperty(NodeCompIndex.NAME, Tags.COMPONENT.get(span));
setRecord(peers, compJsonObj);
} else {
......@@ -57,4 +58,4 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
}
}
}
}
\ No newline at end of file
}
......@@ -21,7 +21,8 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeMappingAnalysis.class);
AbstractNodeMappingAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
AbstractNodeMappingAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -30,19 +31,19 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
logger.debug("node mapping analysis refs isNotEmpty %s", CollectionTools.isNotEmpty(segmentRefList));
if (CollectionTools.isNotEmpty(segmentRefList)) {
logger.debug("node mapping analysis refs list size: %s", segmentRefList.size());
logger.debug("node mapping analysis refs list SIZE: %s", segmentRefList.size());
for (TraceSegmentRef segmentRef : segmentRefList) {
String peers = Const.PEERS_FRONT_SPLIT + segmentRef.getPeerHost() + Const.PEERS_BEHIND_SPLIT;
String code = segment.getApplicationCode();
JsonObject nodeMappingJsonObj = new JsonObject();
nodeMappingJsonObj.addProperty(NodeMappingIndex.Code, code);
nodeMappingJsonObj.addProperty(NodeMappingIndex.Peers, peers);
nodeMappingJsonObj.addProperty(NodeMappingIndex.Time_Slice, timeSlice);
nodeMappingJsonObj.addProperty(NodeMappingIndex.CODE, code);
nodeMappingJsonObj.addProperty(NodeMappingIndex.PEERS, peers);
nodeMappingJsonObj.addProperty(NodeMappingIndex.TIME_SLICE, timeSlice);
String id = timeSlice + Const.ID_SPLIT + code + Const.ID_SPLIT + peers;
setRecord(id, nodeMappingJsonObj);
}
}
}
}
\ No newline at end of file
}
......@@ -16,14 +16,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
NodeCompAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeCompAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseSpans(segment);
}
......@@ -52,7 +53,7 @@ public class NodeCompAnalysis extends AbstractNodeCompAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeCompAnalysis.Size;
return WorkerConfig.Queue.Node.NodeCompAnalysis.SIZE;
}
}
......
......@@ -16,14 +16,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class NodeMappingDayAnalysis extends AbstractNodeMappingAnalysis {
public NodeMappingDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public NodeMappingDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getDay());
}
......@@ -52,7 +53,7 @@ public class NodeMappingDayAnalysis extends AbstractNodeMappingAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingDayAnalysis.Size;
return WorkerConfig.Queue.Node.NodeMappingDayAnalysis.SIZE;
}
}
......
......@@ -16,14 +16,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class NodeMappingHourAnalysis extends AbstractNodeMappingAnalysis {
NodeMappingHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getHour());
}
......@@ -52,7 +53,7 @@ public class NodeMappingHourAnalysis extends AbstractNodeMappingAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingHourAnalysis.Size;
return WorkerConfig.Queue.Node.NodeMappingHourAnalysis.SIZE;
}
}
......
......@@ -16,14 +16,15 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class NodeMappingMinuteAnalysis extends AbstractNodeMappingAnalysis {
NodeMappingMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
analyseRefs(segment, segmentWithTimeSlice.getMinute());
}
......@@ -52,7 +53,7 @@ public class NodeMappingMinuteAnalysis extends AbstractNodeMappingAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingMinuteAnalysis.Size;
return WorkerConfig.Queue.Node.NodeMappingMinuteAnalysis.SIZE;
}
}
......
......@@ -11,7 +11,8 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeCompAgg extends AbstractClusterWorker {
NodeCompAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeCompAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -24,7 +25,8 @@ public class NodeCompAgg extends AbstractClusterWorker {
protected void onWork(Object message) throws Exception {
if (message instanceof RecordData) {
getSelfContext().lookup(NodeCompSave.Role.INSTANCE).tell(message);
} else throw new IllegalArgumentException("message instance must RecordData");
} else
throw new IllegalArgumentException("message instance must RecordData");
}
public static class Factory extends AbstractClusterWorkerProvider<NodeCompAgg> {
......@@ -42,7 +44,7 @@ public class NodeCompAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeCompAgg.Value;
return WorkerConfig.WorkerNum.Node.NodeCompAgg.VALUE;
}
}
......
......@@ -27,8 +27,8 @@ public class NodeCompLoad extends AbstractLocalSyncWorker {
@Override
public void onWork(Object request, Object response) throws Exception {
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeCompIndex.Index);
searchRequestBuilder.setTypes(NodeCompIndex.Type_Record);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeCompIndex.INDEX);
searchRequestBuilder.setTypes(NodeCompIndex.TYPE_RECORD);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(100);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
......@@ -38,13 +38,13 @@ public class NodeCompLoad extends AbstractLocalSyncWorker {
JsonArray nodeCompArray = new JsonArray();
for (SearchHit searchHit : searchHits) {
JsonObject nodeCompObj = new JsonObject();
nodeCompObj.addProperty(NodeCompIndex.Name, (String) searchHit.getSource().get(NodeCompIndex.Name));
nodeCompObj.addProperty(NodeCompIndex.Peers, (String) searchHit.getSource().get(NodeCompIndex.Peers));
nodeCompObj.addProperty(NodeCompIndex.NAME, (String)searchHit.getSource().get(NodeCompIndex.NAME));
nodeCompObj.addProperty(NodeCompIndex.PEERS, (String)searchHit.getSource().get(NodeCompIndex.PEERS));
nodeCompArray.add(nodeCompObj);
logger.debug("node: %s", nodeCompObj.toString());
}
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add("result", nodeCompArray);
}
......
......@@ -14,18 +14,19 @@ import com.a.eye.skywalking.collector.worker.node.NodeCompIndex;
*/
public class NodeCompSave extends RecordPersistenceMember {
NodeCompSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeCompSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeCompIndex.Index;
return NodeCompIndex.INDEX;
}
@Override
public String esType() {
return NodeCompIndex.Type_Record;
return NodeCompIndex.TYPE_RECORD;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeCompSave> {
......@@ -43,7 +44,7 @@ public class NodeCompSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeCompSave.Size;
return WorkerConfig.Queue.Node.NodeCompSave.SIZE;
}
}
......
......@@ -11,7 +11,8 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeMappingDayAgg extends AbstractClusterWorker {
NodeMappingDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -44,7 +45,7 @@ public class NodeMappingDayAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeMappingDayAgg.Value;
return WorkerConfig.WorkerNum.Node.NodeMappingDayAgg.VALUE;
}
}
......
......@@ -14,18 +14,19 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingDaySave extends RecordPersistenceMember {
NodeMappingDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeMappingIndex.Index;
return NodeMappingIndex.INDEX;
}
@Override
public String esType() {
return NodeMappingIndex.Type_Day;
return NodeMappingIndex.TYPE_DAY;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingDaySave> {
......@@ -43,7 +44,7 @@ public class NodeMappingDaySave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingDaySave.Size;
return WorkerConfig.Queue.Node.NodeMappingDaySave.SIZE;
}
}
......
......@@ -11,7 +11,8 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeMappingHourAgg extends AbstractClusterWorker {
NodeMappingHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -44,7 +45,7 @@ public class NodeMappingHourAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeMappingHourAgg.Value;
return WorkerConfig.WorkerNum.Node.NodeMappingHourAgg.VALUE;
}
}
......
......@@ -14,18 +14,19 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingHourSave extends RecordPersistenceMember {
NodeMappingHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeMappingIndex.Index;
return NodeMappingIndex.INDEX;
}
@Override
public String esType() {
return NodeMappingIndex.Type_Hour;
return NodeMappingIndex.TYPE_HOUR;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingHourSave> {
......@@ -43,7 +44,7 @@ public class NodeMappingHourSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingHourSave.Size;
return WorkerConfig.Queue.Node.NodeMappingHourSave.SIZE;
}
}
......
......@@ -11,7 +11,8 @@ import com.a.eye.skywalking.collector.worker.storage.RecordData;
*/
public class NodeMappingMinuteAgg extends AbstractClusterWorker {
NodeMappingMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingMinuteAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -44,7 +45,7 @@ public class NodeMappingMinuteAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.Node.NodeMappingMinuteAgg.Value;
return WorkerConfig.WorkerNum.Node.NodeMappingMinuteAgg.VALUE;
}
}
......
......@@ -14,18 +14,19 @@ import com.a.eye.skywalking.collector.worker.node.NodeMappingIndex;
*/
public class NodeMappingMinuteSave extends RecordPersistenceMember {
NodeMappingMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeMappingMinuteSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeMappingIndex.Index;
return NodeMappingIndex.INDEX;
}
@Override
public String esType() {
return NodeMappingIndex.Type_Minute;
return NodeMappingIndex.TYPE_MINUTE;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMappingMinuteSave> {
......@@ -43,7 +44,7 @@ public class NodeMappingMinuteSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.Node.NodeMappingMinuteSave.Size;
return WorkerConfig.Queue.Node.NodeMappingMinuteSave.SIZE;
}
}
......
......@@ -32,12 +32,12 @@ public class NodeMappingSearchWithTimeSlice extends AbstractLocalSyncWorker {
@Override
public void onWork(Object request, Object response) throws Exception {
if (request instanceof RequestEntity) {
RequestEntity search = (RequestEntity) request;
RequestEntity search = (RequestEntity)request;
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeMappingIndex.Index);
SearchRequestBuilder searchRequestBuilder = EsClient.INSTANCE.getClient().prepareSearch(NodeMappingIndex.INDEX);
searchRequestBuilder.setTypes(search.getSliceType());
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeMappingIndex.Time_Slice).gte(search.getStartTime()).lte(search.getEndTime()));
searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(NodeMappingIndex.TIME_SLICE).gte(search.getStartTime()).lte(search.getEndTime()));
searchRequestBuilder.setSize(0);
searchRequestBuilder.addAggregation(AggregationBuilders.terms(NodeMappingIndex.AGG_COLUMN).field(NodeMappingIndex.AGG_COLUMN).size(100));
......@@ -53,13 +53,13 @@ public class NodeMappingSearchWithTimeSlice extends AbstractLocalSyncWorker {
String peers = aggIds[1];
JsonObject nodeMappingObj = new JsonObject();
nodeMappingObj.addProperty(NodeMappingIndex.Code, code);
nodeMappingObj.addProperty(NodeMappingIndex.Peers, peers);
nodeMappingObj.addProperty(NodeMappingIndex.CODE, code);
nodeMappingObj.addProperty(NodeMappingIndex.PEERS, peers);
nodeMappingArray.add(nodeMappingObj);
}
logger.debug("node mapping data: %s", nodeMappingArray.toString());
JsonObject resJsonObj = (JsonObject) response;
JsonObject resJsonObj = (JsonObject)response;
resJsonObj.add(Const.RESULT, nodeMappingArray);
} else {
throw new IllegalArgumentException("message instance must be RequestEntity");
......
......@@ -10,17 +10,15 @@ import java.io.IOException;
* @author pengys5
*/
public class NodeRefIndex extends AbstractIndex {
public static final String Index = "node_ref_idx";
public static final String Front = "front";
public static final String FrontIsRealCode = "frontIsRealCode";
public static final String Behind = "behind";
public static final String BehindIsRealCode = "behindIsRealCode";
public static final String INDEX = "node_ref_idx";
public static final String FRONT = "front";
public static final String FRONT_IS_REAL_CODE = "frontIsRealCode";
public static final String BEHIND = "behind";
public static final String BEHIND_IS_REAL_CODE = "behindIsRealCode";
@Override
public String index() {
return Index;
return INDEX;
}
@Override
......@@ -31,34 +29,34 @@ public class NodeRefIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(Front)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(FrontIsRealCode)
.field("type", "boolean")
.field("index", "not_analyzed")
.endObject()
.startObject(Behind)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(BehindIsRealCode)
.field("type", "boolean")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
.startObject()
.startObject("properties")
.startObject(FRONT)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(FRONT_IS_REAL_CODE)
.field("type", "boolean")
.field("index", "not_analyzed")
.endObject()
.startObject(BEHIND)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(BEHIND_IS_REAL_CODE)
.field("type", "boolean")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(TIME_SLICE)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -39,7 +39,7 @@ public class NodeRefResSumGetGroupWithTimeSlice extends AbstractGet {
throw new IllegalArgumentException("the request parameter must contains startTime,endTime,timeSliceType");
}
logger.debug("startTime: %s, endTime: %s, timeSliceType: %s", Arrays.toString(request.get("startTime")),
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
Arrays.toString(request.get("endTime")), Arrays.toString(request.get("timeSliceType")));
long startTime;
try {
......
......@@ -10,18 +10,17 @@ import java.io.IOException;
* @author pengys5
*/
public class NodeRefResSumIndex extends AbstractIndex {
public static final String Index = "node_ref_res_sum_idx";
public static final String OneSecondLess = "oneSecondLess";
public static final String ThreeSecondLess = "threeSecondLess";
public static final String FiveSecondLess = "fiveSecondLess";
public static final String FiveSecondGreater = "fiveSecondGreater";
public static final String Error = "error";
public static final String Summary = "summary";
public static final String INDEX = "node_ref_res_sum_idx";
public static final String ONE_SECOND_LESS = "oneSecondLess";
public static final String THREE_SECOND_LESS = "threeSecondLess";
public static final String FIVE_SECOND_LESS = "fiveSecondLess";
public static final String FIVE_SECOND_GREATER = "fiveSecondGreater";
public static final String ERROR = "error";
public static final String SUMMARY = "summary";
@Override
public String index() {
return Index;
return INDEX;
}
@Override
......@@ -32,42 +31,42 @@ public class NodeRefResSumIndex extends AbstractIndex {
@Override
public XContentBuilder createMappingBuilder() throws IOException {
XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject(OneSecondLess)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(ThreeSecondLess)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(FiveSecondLess)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(FiveSecondGreater)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(Error)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(Summary)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(Time_Slice)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
.startObject()
.startObject("properties")
.startObject(ONE_SECOND_LESS)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(THREE_SECOND_LESS)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(FIVE_SECOND_LESS)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(FIVE_SECOND_GREATER)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(ERROR)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(SUMMARY)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.startObject(AGG_COLUMN)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject(TIME_SLICE)
.field("type", "long")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject();
return mappingBuilder;
}
}
......@@ -24,26 +24,28 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(AbstractNodeRefAnalysis.class);
AbstractNodeRefAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
AbstractNodeRefAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
final void analyseNodeRef(TraceSegment segment, long timeSlice, long minute, long hour, long day, int second) throws Exception {
final void analyseNodeRef(TraceSegment segment, long timeSlice, long minute, long hour, long day,
int second) throws Exception {
List<Span> spanList = segment.getSpans();
if (CollectionTools.isNotEmpty(spanList)) {
for (Span span : spanList) {
JsonObject dataJsonObj = new JsonObject();
dataJsonObj.addProperty(NodeRefIndex.Time_Slice, timeSlice);
dataJsonObj.addProperty(NodeRefIndex.FrontIsRealCode, true);
dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, true);
dataJsonObj.addProperty(NodeRefIndex.TIME_SLICE, timeSlice);
dataJsonObj.addProperty(NodeRefIndex.FRONT_IS_REAL_CODE, true);
dataJsonObj.addProperty(NodeRefIndex.BEHIND_IS_REAL_CODE, true);
if (Tags.SPAN_KIND_CLIENT.equals(Tags.SPAN_KIND.get(span)) && ClientSpanIsLeafTools.isLeaf(span.getSpanId(), spanList)) {
String front = segment.getApplicationCode();
dataJsonObj.addProperty(NodeRefIndex.Front, front);
dataJsonObj.addProperty(NodeRefIndex.FRONT, front);
String behind = SpanPeersTools.INSTANCE.getPeers(span);
dataJsonObj.addProperty(NodeRefIndex.Behind, behind);
dataJsonObj.addProperty(NodeRefIndex.BehindIsRealCode, false);
dataJsonObj.addProperty(NodeRefIndex.BEHIND, behind);
dataJsonObj.addProperty(NodeRefIndex.BEHIND_IS_REAL_CODE, false);
String id = timeSlice + Const.ID_SPLIT + front + Const.ID_SPLIT + behind;
logger.debug("dag node ref: %s", dataJsonObj.toString());
......@@ -52,10 +54,10 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
} else if (Tags.SPAN_KIND_SERVER.equals(Tags.SPAN_KIND.get(span))) {
if (span.getParentSpanId() == -1 && CollectionTools.isEmpty(segment.getRefs())) {
String behind = segment.getApplicationCode();
dataJsonObj.addProperty(NodeRefIndex.Behind, behind);
dataJsonObj.addProperty(NodeRefIndex.BEHIND, behind);
String front = Const.USER_CODE;
dataJsonObj.addProperty(NodeRefIndex.Front, front);
dataJsonObj.addProperty(NodeRefIndex.FRONT, front);
String id = timeSlice + Const.ID_SPLIT + front + Const.ID_SPLIT + behind;
setRecord(id, dataJsonObj);
......@@ -66,7 +68,8 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
}
}
private void buildNodeRefResRecordData(String nodeRefId, Span span, long minute, long hour, long day, int second) throws Exception {
private void buildNodeRefResRecordData(String nodeRefId, Span span, long minute, long hour, long day,
int second) throws Exception {
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord = new AbstractNodeRefResSumAnalysis.NodeRefResRecord(minute, hour, day, second);
refResRecord.setStartTime(span.getStartTime());
refResRecord.setEndTime(span.getEndTime());
......@@ -75,5 +78,6 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
sendToResSumAnalysis(refResRecord);
}
protected abstract void sendToResSumAnalysis(AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception;
protected abstract void sendToResSumAnalysis(
AbstractNodeRefResSumAnalysis.NodeRefResRecord refResRecord) throws Exception;
}
......@@ -11,7 +11,8 @@ import com.a.eye.skywalking.collector.worker.storage.AbstractTimeSlice;
*/
abstract class AbstractNodeRefResSumAnalysis extends MetricAnalysisMember {
AbstractNodeRefResSumAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
AbstractNodeRefResSumAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -21,23 +22,23 @@ abstract class AbstractNodeRefResSumAnalysis extends MetricAnalysisMember {
boolean isError = nodeRefRes.isError;
long cost = endTime - startTime;
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.OneSecondLess, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ThreeSecondLess, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondLess, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondGreater, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.Error, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ONE_SECOND_LESS, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.THREE_SECOND_LESS, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_LESS, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_GREATER, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ERROR, 0L);
if (cost <= 1000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.OneSecondLess, 1L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ONE_SECOND_LESS, 1L);
} else if (1000 < cost && cost <= 3000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ThreeSecondLess, 1L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.THREE_SECOND_LESS, 1L);
} else if (3000 < cost && cost <= 5000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondLess, 1L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_LESS, 1L);
} else if (5000 < cost && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FiveSecondGreater, 1L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_GREATER, 1L);
} else {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.Error, 1L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ERROR, 1L);
}
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.Summary, 1L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.SUMMARY, 1L);
}
public static class NodeRefResRecord extends AbstractTimeSlice {
......
......@@ -17,7 +17,8 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
protected NodeRefDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
protected NodeRefDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -30,7 +31,7 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
long minute = segmentWithTimeSlice.getMinute();
......@@ -70,7 +71,7 @@ public class NodeRefDayAnalysis extends AbstractNodeRefAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefDayAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefDayAnalysis.SIZE;
}
}
......
......@@ -17,7 +17,8 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
protected NodeRefHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
protected NodeRefHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -30,7 +31,7 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
long minute = segmentWithTimeSlice.getMinute();
......@@ -70,7 +71,7 @@ public class NodeRefHourAnalysis extends AbstractNodeRefAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefHourAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefHourAnalysis.SIZE;
}
}
......
......@@ -17,7 +17,8 @@ import com.a.eye.skywalking.trace.TraceSegment;
*/
public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
protected NodeRefMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
protected NodeRefMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -30,7 +31,7 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
@Override
public void analyse(Object message) throws Exception {
if (message instanceof SegmentPost.SegmentWithTimeSlice) {
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice) message;
SegmentPost.SegmentWithTimeSlice segmentWithTimeSlice = (SegmentPost.SegmentWithTimeSlice)message;
TraceSegment segment = segmentWithTimeSlice.getTraceSegment();
long minute = segmentWithTimeSlice.getMinute();
long hour = segmentWithTimeSlice.getHour();
......@@ -53,7 +54,6 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefMinuteAnalysis> {
public static Factory INSTANCE = new Factory();
......@@ -70,7 +70,7 @@ public class NodeRefMinuteAnalysis extends AbstractNodeRefAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefMinuteAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefMinuteAnalysis.SIZE;
}
}
......
......@@ -14,14 +14,15 @@ import com.a.eye.skywalking.collector.worker.storage.MetricData;
*/
public class NodeRefResSumDayAnalysis extends AbstractNodeRefResSumAnalysis {
NodeRefResSumDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumDayAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
NodeRefResRecord refResRecord = (NodeRefResRecord)message;
analyseResSum(refResRecord);
}
}
......@@ -49,7 +50,7 @@ public class NodeRefResSumDayAnalysis extends AbstractNodeRefResSumAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefResSumDayAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefResSumDayAnalysis.SIZE;
}
}
......
......@@ -14,14 +14,15 @@ import com.a.eye.skywalking.collector.worker.storage.MetricData;
*/
public class NodeRefResSumHourAnalysis extends AbstractNodeRefResSumAnalysis {
NodeRefResSumHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumHourAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
NodeRefResRecord refResRecord = (NodeRefResRecord)message;
analyseResSum(refResRecord);
}
}
......@@ -49,7 +50,7 @@ public class NodeRefResSumHourAnalysis extends AbstractNodeRefResSumAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefResSumHourAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefResSumHourAnalysis.SIZE;
}
}
......
......@@ -14,14 +14,15 @@ import com.a.eye.skywalking.collector.worker.storage.MetricData;
*/
public class NodeRefResSumMinuteAnalysis extends AbstractNodeRefResSumAnalysis {
NodeRefResSumMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefResSumMinuteAnalysis(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
if (message instanceof NodeRefResRecord) {
NodeRefResRecord refResRecord = (NodeRefResRecord) message;
NodeRefResRecord refResRecord = (NodeRefResRecord)message;
analyseResSum(refResRecord);
}
}
......@@ -49,7 +50,7 @@ public class NodeRefResSumMinuteAnalysis extends AbstractNodeRefResSumAnalysis {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefResSumMinuteAnalysis.Size;
return WorkerConfig.Queue.NodeRef.NodeRefResSumMinuteAnalysis.SIZE;
}
}
......
......@@ -15,7 +15,8 @@ public class NodeRefDayAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefDayAgg.class);
NodeRefDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefDayAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +49,7 @@ public class NodeRefDayAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.NodeRef.NodeRefDayAgg.Value;
return WorkerConfig.WorkerNum.NodeRef.NodeRefDayAgg.VALUE;
}
}
......
......@@ -14,18 +14,19 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefDaySave extends RecordPersistenceMember {
NodeRefDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefDaySave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeRefIndex.Index;
return NodeRefIndex.INDEX;
}
@Override
public String esType() {
return NodeRefIndex.Type_Day;
return NodeRefIndex.TYPE_DAY;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefDaySave> {
......@@ -44,7 +45,7 @@ public class NodeRefDaySave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefDaySave.Size;
return WorkerConfig.Queue.NodeRef.NodeRefDaySave.SIZE;
}
}
......
......@@ -15,7 +15,8 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
private Logger logger = LogManager.getFormatterLogger(NodeRefHourAgg.class);
NodeRefHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefHourAgg(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
......@@ -48,7 +49,7 @@ public class NodeRefHourAgg extends AbstractClusterWorker {
@Override
public int workerNum() {
return WorkerConfig.WorkerNum.NodeRef.NodeRefHourAgg.Value;
return WorkerConfig.WorkerNum.NodeRef.NodeRefHourAgg.VALUE;
}
}
......
......@@ -14,18 +14,19 @@ import com.a.eye.skywalking.collector.worker.noderef.NodeRefIndex;
*/
public class NodeRefHourSave extends RecordPersistenceMember {
NodeRefHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
NodeRefHourSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return NodeRefIndex.Index;
return NodeRefIndex.INDEX;
}
@Override
public String esType() {
return NodeRefIndex.Type_Hour;
return NodeRefIndex.TYPE_HOUR;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefHourSave> {
......@@ -44,7 +45,7 @@ public class NodeRefHourSave extends RecordPersistenceMember {
@Override
public int queueSize() {
return WorkerConfig.Queue.NodeRef.NodeRefHourSave.Size;
return WorkerConfig.Queue.NodeRef.NodeRefHourSave.SIZE;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册