提交 c82ad286 编写于 作者: P pengys5

config of system start

上级 4960fa80
package com.a.eye.skywalking.collector.cluster.config;
/**
* Created by pengys5 on 2017/2/22 0022.
*/
public class CollectorConfig {
public static final String appname = "CollectorSystem";
public static class Collector {
public static String hostname = "127.0.0.1";
public static String port = "2551";
public static String cluster = "127.0.0.1:2551";
}
}
package com.a.eye.skywalking.collector.cluster.config;
import com.a.eye.skywalking.api.conf.Config;
import com.a.eye.skywalking.api.logging.api.ILog;
import com.a.eye.skywalking.api.logging.api.LogManager;
import com.a.eye.skywalking.api.util.ConfigInitializer;
import com.a.eye.skywalking.api.util.StringUtil;
import java.io.InputStream;
import java.util.Properties;
/**
* Created by pengys5 on 2017/2/22 0022.
*/
public class CollectorConfigInitializer {
private static ILog logger = LogManager.getLogger(CollectorConfigInitializer.class);
public static void initialize() {
InputStream configFileStream = CollectorConfigInitializer.class.getResourceAsStream("/collector.config");
if (configFileStream == null) {
logger.info("Not provide sky-walking certification documents, sky-walking api run in default config.");
} else {
try {
Properties properties = new Properties();
properties.load(configFileStream);
ConfigInitializer.initialize(properties, CollectorConfig.class);
} catch (Exception e) {
logger.error("Failed to read the config file, sky-walking api run in default config.", e);
}
}
if (!StringUtil.isEmpty(System.getProperty("collector.hostname"))) {
CollectorConfig.Collector.hostname = System.getProperty("collector.hostname");
}
if (!StringUtil.isEmpty(System.getProperty("collector.port"))) {
CollectorConfig.Collector.port = System.getProperty("collector.port");
}
if (!StringUtil.isEmpty(System.getProperty("collector.cluster"))) {
CollectorConfig.Collector.cluster = System.getProperty("collector.cluster");
}
}
}
package com.a.eye.skywalking.collector.cluster.producer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import com.a.eye.skywalking.collector.cluster.Const;
import com.a.eye.skywalking.collector.cluster.consumer.TraceConsumerActor;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfigInitializer;
import com.a.eye.skywalking.collector.cluster.manager.ActorManagerActor;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.a.eye.skywalking.collector.cluster.message.TraceMessages.TransformationJob;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.util.Timeout;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static akka.pattern.Patterns.ask;
/**
* {@link TraceProducerApp} is a producer for trace agent to send @link TraceSegment.
* <p>
* Created by pengys5 on 2017/2/17.
*/
public class TraceProducerApp {
public static void main(String[] args) {
// Override the configuration of the port when specified as program argument
final String port = args.length > 0 ? args[0] : "2552";
final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load());
final Config config = TraceProducerApp.buildConfig();
ActorSystem system = ActorSystem.create("ClusterSystem", config);
ActorSystem system = ActorSystem.create(CollectorConfig.appname, config);
system.actorOf(Props.create(ActorManagerActor.class), Const.Actor_Manager_Role);
final ActorRef frontend = system.actorOf(Props.create(TraceProducerActor.class), Const.Trace_Producer_Role);
......@@ -39,19 +40,44 @@ public class TraceProducerApp {
final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ExecutionContext ec = system.dispatcher();
final AtomicInteger counter = new AtomicInteger();
system.scheduler().schedule(interval, interval, new Runnable() {
public void run() {
// TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace();
ask(frontend,
new TransformationJob("hello-" + counter.incrementAndGet(), null),
timeout).onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
}
}, ec);
}
system.scheduler().schedule(interval, interval, () -> {
ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet(), null), timeout).onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
System.out.println(result);
}
}, ec);
}, ec);
}
public static Config buildConfig() {
CollectorConfigInitializer.initialize();
Config config = ConfigFactory.parseString("akka.actor.provider = akka.cluster.ClusterActorRefProvider")
.withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname = " + CollectorConfig.Collector.hostname))
.withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port = " + CollectorConfig.Collector.port))
.withFallback(ConfigFactory.parseString("akka.remote.log-remote-lifecycle-events = off"))
.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes = [" + TraceProducerApp.buildSeedNodes(CollectorConfig.Collector.cluster) + "]"))
.withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = 10s"))
.withFallback(ConfigFactory.parseString("akka.cluster.roles = [Actor_Manager_Role, Trace_Producer_Role, Trace_Consumer_Role]"))
.withFallback(ConfigFactory.parseString("akka.cluster.metrics.enabled = off"));
// .withFallback(ConfigFactory.load());
return config;
}
public static String buildSeedNodes(String cluster) {
String[] clusters = cluster.split(",");
StringBuffer seedNodes = new StringBuffer();
for (int i = 0; i < clusters.length; i++) {
if (i > 0) {
seedNodes.append(",");
}
seedNodes.append("\"akka.tcp://").append(CollectorConfig.appname).append("@");
seedNodes.append(clusters[i]).append("\"");
}
return seedNodes.toString();
}
}
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfig;
import com.a.eye.skywalking.collector.cluster.config.CollectorConfigInitializer;
import com.a.eye.skywalking.collector.cluster.producer.TraceProducerApp;
import com.typesafe.config.Config;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
/**
* Created by pengys5 on 2017/2/22 0022.
*/
public class CollectorConfigTestCase {
@Test
public void testConfigInitializer() {
System.setProperty("collector.hostname", "192.168.0.1");
System.setProperty("collector.port", "1000");
System.setProperty("collector.cluster", "192.168.0.1:1000");
CollectorConfigInitializer.initialize();
Assert.assertEquals("192.168.0.1", CollectorConfig.Collector.hostname);
Assert.assertEquals("1000", CollectorConfig.Collector.port);
Assert.assertEquals("192.168.0.1:1000", CollectorConfig.Collector.cluster);
}
@Test
public void testBuildSeedNodes() {
String seedNodesContainOne = TraceProducerApp.buildSeedNodes("192.168.0.1:1000");
Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1000\"", seedNodesContainOne);
String seedNodesContainTwo = TraceProducerApp.buildSeedNodes("192.168.0.1:1001,192.168.0.2:1002");
Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1001\",\"akka.tcp://CollectorSystem@192.168.0.2:1002\"", seedNodesContainTwo);
String seedNodesContainThree = TraceProducerApp.buildSeedNodes("192.168.0.1:1001,192.168.0.2:1002,192.168.0.3:1003");
Assert.assertEquals("\"akka.tcp://CollectorSystem@192.168.0.1:1001\",\"akka.tcp://CollectorSystem@192.168.0.2:1002\",\"akka.tcp://CollectorSystem@192.168.0.3:1003\"", seedNodesContainThree);
}
@Test
public void testBuildConfig() {
Config config = TraceProducerApp.buildConfig();
Assert.assertEquals("akka.cluster.ClusterActorRefProvider", config.getString("akka.actor.provider"));
Assert.assertEquals("10s", config.getString("akka.cluster.auto-down-unreachable-after"));
Assert.assertEquals("off", config.getString("akka.cluster.metrics.enabled"));
Assert.assertEquals("off", config.getString("akka.remote.log-remote-lifecycle-events"));
Assert.assertEquals("127.0.0.1", config.getString("akka.remote.netty.tcp.hostname"));
Assert.assertEquals("2551", config.getString("akka.remote.netty.tcp.port"));
String[] roles = {"Actor_Manager_Role", "Trace_Producer_Role", "Trace_Consumer_Role"};
Assert.assertArrayEquals(roles, config.getStringList("akka.cluster.roles").toArray());
}
}
package com.a.eye.skywalking.collector.cluster;
import com.a.eye.skywalking.sniffer.mock.trace.TraceSegmentBuilderFactory;
import com.a.eye.skywalking.trace.TraceSegment;
import org.junit.Test;
/**
* Created by pengys5 on 2017/2/22 0022.
*/
public class TraceSegmentTestCase {
@Test
public void testProducerSend() {
TraceSegment traceSegment = TraceSegmentBuilderFactory.INSTANCE.singleTomcat200Trace();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册