提交 1cb05658 编写于 作者: P peng-yongsheng

cluster, naming, grpc manager, jetty manager, storage modules test successful.

上级 71d27b18
......@@ -21,7 +21,10 @@ package org.skywalking.apm.collector.agent.grpc;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
......@@ -29,6 +32,8 @@ import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
......@@ -38,11 +43,12 @@ import org.skywalking.apm.collector.storage.service.DAOService;
*/
public class AgentModuleGRPCProvider extends ModuleProvider {
public static final String NAME = "gRPC";
private static final String HOST = "host";
private static final String PORT = "port";
@Override public String name() {
return "gRPC";
return NAME;
}
@Override public Class<? extends Module> module() {
......@@ -61,10 +67,17 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
AgentGRPCNamingListener namingListener = new AgentGRPCNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(namingListener);
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new AgentGRPCNamingHandler(namingListener));
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.getOrCreateIfAbsent(host, port);
Server gRPCServer = managerService.createIfAbsent(host, port);
addHandlers(daoService, gRPCServer);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
......@@ -76,7 +89,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[0];
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME};
}
private void addHandlers(DAOService daoService, Server gRPCServer) {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler.naming;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
* @author peng-yongsheng
*/
public class AgentGRPCNamingHandler extends JettyHandler {
private final AgentGRPCNamingListener namingListener;
public AgentGRPCNamingHandler(AgentGRPCNamingListener namingListener) {
this.namingListener = namingListener;
}
@Override public String pathSpec() {
return "/agent/gRPC";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
Set<String> servers = namingListener.getAddresses();
JsonArray serverArray = new JsonArray();
servers.forEach(serverArray::add);
return serverArray;
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.AgentModuleGRPCProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
/**
* @author peng-yongsheng
*/
public class AgentGRPCNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleGRPCProvider.NAME;
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
......@@ -41,5 +41,10 @@
<artifactId>collector-agent-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-jetty-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -20,17 +20,36 @@ package org.skywalking.apm.collector.agent.jetty;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public class AgentModuleJettyProvider extends ModuleProvider {
public static final String NAME = "jetty";
private static final String HOST = "host";
private static final String PORT = "port";
private static final String CONTEXT_PATH = "context_path";
@Override public String name() {
return "jetty";
return NAME;
}
@Override public Class<? extends Module> module() {
......@@ -42,7 +61,29 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
String contextPath = config.getProperty(CONTEXT_PATH);
try {
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
AgentJettyNamingListener namingListener = new AgentJettyNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(namingListener);
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(daoService, jettyServer);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......@@ -50,6 +91,10 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[0];
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME};
}
private void addHandlers(DAOService daoService, Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
/**
* @author peng-yongsheng
*/
public class AgentModuleJettyRegistration extends ModuleRegistration {
private final String host;
private final int port;
private final String contextPath;
public AgentModuleJettyRegistration(String host, int port, String contextPath) {
this.host = host;
this.port = port;
this.contextPath = contextPath;
}
@Override public Value buildValue() {
return new Value(host, port, contextPath);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler;
import com.google.gson.JsonElement;
import com.google.gson.stream.JsonReader;
import java.io.BufferedReader;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class TraceSegmentServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
@Override public String pathSpec() {
return "/segments";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
logger.debug("receive stream segment");
try {
BufferedReader bufferedReader = req.getReader();
read(bufferedReader);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return null;
}
private TraceSegmentJsonReader jsonReader = new TraceSegmentJsonReader();
private void read(BufferedReader bufferedReader) throws IOException {
JsonReader reader = new JsonReader(bufferedReader);
reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse(null);
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
}
reader.endArray();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.naming;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
/**
* @author peng-yongsheng
*/
public class AgentJettyNamingHandler extends JettyHandler {
private final AgentJettyNamingListener namingListener;
public AgentJettyNamingHandler(AgentJettyNamingListener namingListener) {
this.namingListener = namingListener;
}
@Override public String pathSpec() {
return "/agent/jetty";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
Set<String> servers = namingListener.getAddresses();
JsonArray serverArray = new JsonArray();
servers.forEach(serverArray::add);
return serverArray;
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.AgentModuleJettyProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
/**
* @author peng-yongsheng
*/
public class AgentJettyNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleJettyProvider.NAME;
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
}
@Override public void serverQuitNotify(String serverAddress) {
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.KeyWithStringValue;
/**
* @author peng-yongsheng
*/
public class KeyWithStringValueJsonReader implements StreamJsonReader<KeyWithStringValue> {
private static final String KEY = "k";
private static final String VALUE = "v";
@Override public KeyWithStringValue read(JsonReader reader) throws IOException {
KeyWithStringValue.Builder builder = KeyWithStringValue.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case KEY:
builder.setKey(reader.nextString());
break;
case VALUE:
builder.setValue(reader.nextString());
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder.build();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.LogMessage;
/**
* @author peng-yongsheng
*/
public class LogJsonReader implements StreamJsonReader<LogMessage> {
private KeyWithStringValueJsonReader keyWithStringValueJsonReader = new KeyWithStringValueJsonReader();
private static final String TIME = "ti";
private static final String LOG_DATA = "ld";
@Override public LogMessage read(JsonReader reader) throws IOException {
LogMessage.Builder builder = LogMessage.newBuilder();
while (reader.hasNext()) {
switch (reader.nextName()) {
case TIME:
builder.setTime(reader.nextLong());
case LOG_DATA:
reader.beginArray();
while (reader.hasNext()) {
builder.addData(keyWithStringValueJsonReader.read(reader));
}
reader.endArray();
default:
reader.skipValue();
}
}
return builder.build();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.TraceSegmentReference;
/**
* @author peng-yongsheng
*/
public class ReferenceJsonReader implements StreamJsonReader<TraceSegmentReference> {
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private static final String PARENT_TRACE_SEGMENT_ID = "ts";
private static final String PARENT_APPLICATION_ID = "ai";
private static final String PARENT_SPAN_ID = "si";
private static final String PARENT_SERVICE_ID = "vi";
private static final String PARENT_SERVICE_NAME = "vn";
private static final String NETWORK_ADDRESS_ID = "ni";
private static final String NETWORK_ADDRESS = "nn";
private static final String ENTRY_APPLICATION_INSTANCE_ID = "ea";
private static final String ENTRY_SERVICE_ID = "ei";
private static final String ENTRY_SERVICE_NAME = "en";
private static final String REF_TYPE_VALUE = "rv";
@Override public TraceSegmentReference read(JsonReader reader) throws IOException {
TraceSegmentReference.Builder builder = TraceSegmentReference.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case PARENT_TRACE_SEGMENT_ID:
builder.setParentTraceSegmentId(uniqueIdJsonReader.read(reader));
break;
case PARENT_APPLICATION_ID:
builder.setParentApplicationInstanceId(reader.nextInt());
break;
case PARENT_SPAN_ID:
builder.setParentSpanId(reader.nextInt());
break;
case PARENT_SERVICE_ID:
builder.setParentServiceId(reader.nextInt());
break;
case PARENT_SERVICE_NAME:
builder.setParentServiceName(reader.nextString());
break;
case NETWORK_ADDRESS_ID:
builder.setNetworkAddressId(reader.nextInt());
break;
case NETWORK_ADDRESS:
builder.setNetworkAddress(reader.nextString());
break;
case ENTRY_APPLICATION_INSTANCE_ID:
builder.setEntryApplicationInstanceId(reader.nextInt());
break;
case ENTRY_SERVICE_ID:
builder.setEntryServiceId(reader.nextInt());
break;
case ENTRY_SERVICE_NAME:
builder.setEntryServiceName(reader.nextString());
break;
case REF_TYPE_VALUE:
builder.setRefTypeValue(reader.nextInt());
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder.build();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class SegmentJsonReader implements StreamJsonReader<TraceSegmentObject.Builder> {
private final Logger logger = LoggerFactory.getLogger(SegmentJsonReader.class);
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private ReferenceJsonReader referenceJsonReader = new ReferenceJsonReader();
private SpanJsonReader spanJsonReader = new SpanJsonReader();
private static final String TRACE_SEGMENT_ID = "ts";
private static final String APPLICATION_ID = "ai";
private static final String APPLICATION_INSTANCE_ID = "ii";
private static final String TRACE_SEGMENT_REFERENCE = "rs";
private static final String SPANS = "ss";
@Override public TraceSegmentObject.Builder read(JsonReader reader) throws IOException {
TraceSegmentObject.Builder builder = TraceSegmentObject.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case TRACE_SEGMENT_ID:
builder.setTraceSegmentId(uniqueIdJsonReader.read(reader));
if (logger.isDebugEnabled()) {
StringBuilder segmentId = new StringBuilder();
builder.getTraceSegmentId().getIdPartsList().forEach(idPart -> segmentId.append(idPart));
logger.debug("segment id: {}", segmentId);
}
break;
case APPLICATION_ID:
builder.setApplicationId(reader.nextInt());
break;
case APPLICATION_INSTANCE_ID:
builder.setApplicationInstanceId(reader.nextInt());
break;
case TRACE_SEGMENT_REFERENCE:
reader.beginArray();
while (reader.hasNext()) {
builder.addRefs(referenceJsonReader.read(reader));
}
reader.endArray();
break;
case SPANS:
reader.beginArray();
while (reader.hasNext()) {
builder.addSpans(spanJsonReader.read(reader));
}
reader.endArray();
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.SpanObject;
/**
* @author peng-yongsheng
*/
public class SpanJsonReader implements StreamJsonReader<SpanObject> {
private KeyWithStringValueJsonReader keyWithStringValueJsonReader = new KeyWithStringValueJsonReader();
private LogJsonReader logJsonReader = new LogJsonReader();
private static final String SPAN_ID = "si";
private static final String SPAN_TYPE_VALUE = "tv";
private static final String SPAN_LAYER_VALUE = "lv";
private static final String PARENT_SPAN_ID = "ps";
private static final String START_TIME = "st";
private static final String END_TIME = "et";
private static final String COMPONENT_ID = "ci";
private static final String COMPONENT_NAME = "cn";
private static final String OPERATION_NAME_ID = "oi";
private static final String OPERATION_NAME = "on";
private static final String PEER_ID = "pi";
private static final String PEER = "pn";
private static final String IS_ERROR = "ie";
private static final String TAGS = "to";
private static final String LOGS = "lo";
@Override public SpanObject read(JsonReader reader) throws IOException {
SpanObject.Builder builder = SpanObject.newBuilder();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case SPAN_ID:
builder.setSpanId(reader.nextInt());
break;
case SPAN_TYPE_VALUE:
builder.setSpanTypeValue(reader.nextInt());
break;
case SPAN_LAYER_VALUE:
builder.setSpanLayerValue(reader.nextInt());
break;
case PARENT_SPAN_ID:
builder.setParentSpanId(reader.nextInt());
break;
case START_TIME:
builder.setStartTime(reader.nextLong());
break;
case END_TIME:
builder.setEndTime(reader.nextLong());
break;
case COMPONENT_ID:
builder.setComponentId(reader.nextInt());
break;
case COMPONENT_NAME:
builder.setComponent(reader.nextString());
break;
case OPERATION_NAME_ID:
builder.setOperationNameId(reader.nextInt());
break;
case OPERATION_NAME:
builder.setOperationName(reader.nextString());
break;
case PEER_ID:
builder.setPeerId(reader.nextInt());
break;
case PEER:
builder.setPeer(reader.nextString());
break;
case IS_ERROR:
builder.setIsError(reader.nextBoolean());
break;
case TAGS:
reader.beginArray();
while (reader.hasNext()) {
builder.addTags(keyWithStringValueJsonReader.read(reader));
}
reader.endArray();
break;
case LOGS:
reader.beginArray();
while (reader.hasNext()) {
builder.addLogs(logJsonReader.read(reader));
}
reader.endArray();
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return builder.build();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public interface StreamJsonReader<T> {
T read(JsonReader reader) throws IOException;
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author peng-yongsheng
*/
public class TraceSegment {
private UpstreamSegment.Builder builder;
public TraceSegment() {
builder = UpstreamSegment.newBuilder();
}
public void addGlobalTraceId(UniqueId.Builder globalTraceId) {
builder.addGlobalTraceIds(globalTraceId);
}
public void setTraceSegmentBuilder(TraceSegmentObject.Builder traceSegmentBuilder) {
builder.setSegment(traceSegmentBuilder.build().toByteString());
}
public UpstreamSegment getUpstreamSegment() {
return builder.build();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class TraceSegmentJsonReader implements StreamJsonReader<TraceSegment> {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentJsonReader.class);
private UniqueIdJsonReader uniqueIdJsonReader = new UniqueIdJsonReader();
private SegmentJsonReader segmentJsonReader = new SegmentJsonReader();
private static final String GLOBAL_TRACE_IDS = "gt";
private static final String SEGMENT = "sg";
@Override public TraceSegment read(JsonReader reader) throws IOException {
TraceSegment traceSegment = new TraceSegment();
reader.beginObject();
while (reader.hasNext()) {
switch (reader.nextName()) {
case GLOBAL_TRACE_IDS:
reader.beginArray();
while (reader.hasNext()) {
traceSegment.addGlobalTraceId(uniqueIdJsonReader.read(reader));
}
reader.endArray();
break;
case SEGMENT:
traceSegment.setTraceSegmentBuilder(segmentJsonReader.read(reader));
break;
default:
reader.skipValue();
break;
}
}
reader.endObject();
return traceSegment;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.jetty.handler.reader;
import com.google.gson.stream.JsonReader;
import java.io.IOException;
import org.skywalking.apm.network.proto.UniqueId;
/**
* @author peng-yongsheng
*/
public class UniqueIdJsonReader implements StreamJsonReader<UniqueId.Builder> {
@Override public UniqueId.Builder read(JsonReader reader) throws IOException {
UniqueId.Builder builder = UniqueId.newBuilder();
reader.beginArray();
while (reader.hasNext()) {
builder.addIdParts(reader.nextLong());
}
reader.endArray();
return builder;
}
}
......@@ -24,5 +24,15 @@
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
cluster:
zookeeper:
hostPort: localhost:2181
sessionTimeout: 100000
naming:
jetty:
host: localhost
port: 10800
context_path: /
#agent_stream:
# grpc:
# host: localhost
# port: 11800
# jetty:
# host: localhost
# port: 12800
# context_path: /
# config:
# buffer_offset_max_file_size: 10M
# buffer_segment_max_file_size: 500M
agent:
gRPC:
host: localhost
port: 11800
jetty:
host: localhost
port: 12800
context_path: /
config:
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
ui:
jetty:
host: localhost
port: 12800
context_path: /
#collector_inside:
# grpc:
# host: localhost
# port: 11800
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
#storage:
# h2:
# url: jdbc:h2:tcp://localhost/~/test
# user_name: sa
\ No newline at end of file
index_replicas_number: 0
\ No newline at end of file
......@@ -19,18 +19,33 @@
package org.skywalking.apm.collector.cluster.standalone;
import java.util.Properties;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.cluster.standalone.service.StandaloneModuleListenerService;
import org.skywalking.apm.collector.cluster.standalone.service.StandaloneModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.core.util.Const;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ClusterModuleStandaloneProvider extends ModuleProvider {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleStandaloneProvider.class);
private static final String URL = "url";
private static final String USER_NAME = "user_name";
private H2Client h2Client;
private ClusterStandaloneDataMonitor dataMonitor;
@Override public String name() {
return "standalone";
}
......@@ -40,11 +55,23 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(ModuleRegisterService.class, new StandaloneModuleRegisterService());
this.dataMonitor = new ClusterStandaloneDataMonitor();
final String url = config.getProperty(URL);
final String userName = config.getProperty(USER_NAME);
h2Client = new H2Client(url, userName, Const.EMPTY_STRING);
this.dataMonitor.setClient(h2Client);
this.registerServiceImplementation(ModuleListenerService.class, new StandaloneModuleListenerService(dataMonitor));
this.registerServiceImplementation(ModuleRegisterService.class, new StandaloneModuleRegisterService(dataMonitor));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
try {
h2Client.initialize();
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster.standalone;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.Client;
import org.skywalking.apm.collector.client.ClientException;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.cluster.DataMonitor;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.core.CollectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ClusterStandaloneDataMonitor implements DataMonitor {
private final Logger logger = LoggerFactory.getLogger(ClusterStandaloneDataMonitor.class);
private H2Client client;
private Map<String, ClusterModuleListener> listeners;
private Map<String, ModuleRegistration> registrations;
public ClusterStandaloneDataMonitor() {
listeners = new LinkedHashMap<>();
registrations = new LinkedHashMap<>();
}
@Override public void setClient(Client client) {
this.client = (H2Client)client;
}
@Override
public void addListener(ClusterModuleListener listener) {
String path = BASE_CATALOG + listener.path();
logger.info("listener path: {}", path);
listeners.put(path, listener);
}
@Override public ClusterModuleListener getListener(String path) {
path = BASE_CATALOG + path;
return listeners.get(path);
}
@Override public void register(String path, ModuleRegistration registration) {
registrations.put(BASE_CATALOG + path, registration);
}
@Override public void createPath(String path) throws ClientException {
}
@Override public void setData(String path, String value) throws ClientException {
if (listeners.containsKey(path)) {
listeners.get(path).addAddress(value);
listeners.get(path).serverJoinNotify(value);
}
}
public void start() throws CollectorException {
Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator();
while (entryIterator.hasNext()) {
Map.Entry<String, ModuleRegistration> next = entryIterator.next();
ModuleRegistration.Value value = next.getValue().buildValue();
String contextPath = value.getContextPath() == null ? "" : value.getContextPath();
setData(next.getKey(), value.getHostPort() + contextPath);
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.cluster.standalone.service;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.standalone.ClusterStandaloneDataMonitor;
/**
* @author peng-yongsheng
*/
public class StandaloneModuleListenerService implements ModuleListenerService {
private final ClusterStandaloneDataMonitor dataMonitor;
public StandaloneModuleListenerService(ClusterStandaloneDataMonitor dataMonitor) {
this.dataMonitor = dataMonitor;
}
@Override public void addListener(ClusterModuleListener listener) {
dataMonitor.addListener(listener);
}
}
......@@ -20,13 +20,21 @@ package org.skywalking.apm.collector.cluster.standalone.service;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.cluster.standalone.ClusterStandaloneDataMonitor;
/**
* @author peng-yongsheng
*/
public class StandaloneModuleRegisterService implements ModuleRegisterService {
@Override public void register(String moduleName, String providerName, ModuleRegistration registration) {
private final ClusterStandaloneDataMonitor dataMonitor;
public StandaloneModuleRegisterService(ClusterStandaloneDataMonitor dataMonitor) {
this.dataMonitor = dataMonitor;
}
@Override public void register(String moduleName, String providerName, ModuleRegistration registration) {
String path = "/" + moduleName + "/" + providerName;
dataMonitor.register(path, registration);
}
}
}
\ No newline at end of file
......@@ -73,6 +73,7 @@ public class JettyServer implements Server {
}
@Override public void start() throws ServerException {
logger.info("start server, host: {}, port: {}", host, port);
try {
for (ServletMapping servletMapping : servletContextHandler.getServletHandler().getServletMappings()) {
logger.info("jetty servlet mappings: {} register by {}", servletMapping.getPathSpecs(), servletMapping.getServletName());
......
......@@ -96,7 +96,7 @@ public abstract class ModuleProvider {
Service service) throws ServiceNotProvidedException {
if (serviceType.isInstance(service)) {
if (manager.isServiceInstrument()) {
service = ServiceInstrumentation.INSTANCE.buildServiceUnderMonitor(module.name(), name(), service);
// service = ServiceInstrumentation.INSTANCE.buildServiceUnderMonitor(module.name(), name(), service);
}
this.services.put(serviceType, service);
} else {
......
cluster:
h2:
hostPort: localhost:2181
sessionTimeout: 100000
standalone:
url: jdbc:h2:~/memorydb
user_name: sa
cache:
guava:
naming:
jetty:
host: localhost
port: 10800
context_path: /
#agent_stream:
# grpc:
# host: localhost
# port: 11800
# jetty:
# host: localhost
# port: 12800
# context_path: /
# config:
# buffer_offset_max_file_size: 10M
# buffer_segment_max_file_size: 500M
ui:
jetty:
host: localhost
......@@ -27,12 +18,7 @@ jetty_manager:
jetty:
gRPC_manager:
gRPC:
#collector_inside:
# grpc:
# host: localhost
# port: 11800
storage:
h2:
url: jdbc:h2:~/memorydb
user_name: sa
password:
\ No newline at end of file
user_name: sa
\ No newline at end of file
......@@ -25,5 +25,5 @@ import org.skywalking.apm.collector.server.Server;
* @author peng-yongsheng
*/
public interface GRPCManagerService extends Service {
Server getOrCreateIfAbsent(String host, int port);
Server createIfAbsent(String host, int port);
}
......@@ -38,7 +38,7 @@ public class GRPCManagerServiceImpl implements GRPCManagerService {
this.servers = servers;
}
@Override public Server getOrCreateIfAbsent(String host, int port) {
@Override public Server createIfAbsent(String host, int port) {
String id = host + String.valueOf(port);
if (servers.containsKey(id)) {
return servers.get(id);
......
......@@ -20,10 +20,13 @@ package org.skywalking.apm.collector.jetty.manager.service;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.server.ServerHandler;
/**
* @author peng-yongsheng
*/
public interface JettyManagerService extends Service {
Server getOrCreateIfAbsent(String host, int port, String contextPath);
Server createIfAbsent(String host, int port, String contextPath);
void addHandler(String host, int port, ServerHandler serverHandler);
}
......@@ -19,8 +19,10 @@
package org.skywalking.apm.collector.jetty.manager.service;
import java.util.Map;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.server.ServerException;
import org.skywalking.apm.collector.server.ServerHandler;
import org.skywalking.apm.collector.server.jetty.JettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,7 +40,7 @@ public class JettyManagerServiceImpl implements JettyManagerService {
this.servers = servers;
}
@Override public Server getOrCreateIfAbsent(String host, int port, String contextPath) {
@Override public Server createIfAbsent(String host, int port, String contextPath) {
String id = host + String.valueOf(port);
if (servers.containsKey(id)) {
return servers.get(id);
......@@ -53,4 +55,13 @@ public class JettyManagerServiceImpl implements JettyManagerService {
return server;
}
}
@Override public void addHandler(String host, int port, ServerHandler serverHandler) {
String id = host + String.valueOf(port);
if (servers.containsKey(id)) {
servers.get(id).addHandler(serverHandler);
} else {
throw new UnexpectedException("Please create server before add server handler.");
}
}
}
......@@ -18,8 +18,6 @@
package org.skywalking.apm.collector.naming.jetty;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.core.module.Module;
......@@ -31,8 +29,6 @@ import org.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.jetty.service.NamingJettyHandlerRegisterService;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.server.ServerHandler;
/**
* @author peng-yongsheng
......@@ -42,7 +38,6 @@ public class NamingModuleJettyProvider extends ModuleProvider {
private static final String HOST = "host";
private static final String PORT = "port";
private static final String CONTEXT_PATH = "context_path";
private final List<ServerHandler> handlers = new ArrayList<>();
@Override public String name() {
return "jetty";
......@@ -53,7 +48,9 @@ public class NamingModuleJettyProvider extends ModuleProvider {
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(NamingHandlerRegisterService.class, new NamingJettyHandlerRegisterService(handlers));
final String host = config.getProperty(HOST);
final Integer port = (Integer)config.get(PORT);
this.registerServiceImplementation(NamingHandlerRegisterService.class, new NamingJettyHandlerRegisterService(host, port, getManager()));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......@@ -63,18 +60,16 @@ public class NamingModuleJettyProvider extends ModuleProvider {
try {
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.getOrCreateIfAbsent(host, port, contextPath);
handlers.forEach(jettyServer::addHandler);
managerService.createIfAbsent(host, port, contextPath);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[] {JettyManagerModule.NAME, ClusterModule.NAME};
return new String[] {ClusterModule.NAME, JettyManagerModule.NAME};
}
}
......@@ -18,22 +18,39 @@
package org.skywalking.apm.collector.naming.jetty.service;
import java.util.List;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.ServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NamingJettyHandlerRegisterService implements NamingHandlerRegisterService {
private final List<ServerHandler> handlers;
private final Logger logger = LoggerFactory.getLogger(NamingJettyHandlerRegisterService.class);
public NamingJettyHandlerRegisterService(List<ServerHandler> handlers) {
this.handlers = handlers;
private final ModuleManager moduleManager;
private final String host;
private final int port;
public NamingJettyHandlerRegisterService(String host, int port, ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.host = host;
this.port = port;
}
@Override public void register(ServerHandler namingHandler) {
handlers.add(namingHandler);
try {
JettyManagerService managerService = moduleManager.find(JettyManagerModule.NAME).getService(JettyManagerService.class);
managerService.addHandler(this.host, this.port, namingHandler);
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -50,7 +50,7 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
try {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.getOrCreateIfAbsent(host, port);
Server gRPCServer = managerService.createIfAbsent(host, port);
gRPCServer.addHandler(new RemoteCommonServiceHandler(listener));
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
......
org.skywalking.apm.collector.storage.es.dao.CpuMetricEsDAO
org.skywalking.apm.collector.storage.es.dao.GCMetricEsDAO
org.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsDAO
org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsDAO
org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsDAO
org.skywalking.apm.collector.storage.es.dao.ApplicationEsDAO
org.skywalking.apm.collector.storage.es.dao.InstanceEsDAO
org.skywalking.apm.collector.storage.es.dao.ServiceNameEsDAO
org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsDAO
org.skywalking.apm.collector.storage.es.dao.NodeComponentEsDAO
org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.storage.es.dao.SegmentCostEsDAO
org.skywalking.apm.collector.storage.es.dao.SegmentEsDAO
org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsDAO
org.skywalking.apm.collector.storage.es.dao.ApplicationEsCacheDAO
org.skywalking.apm.collector.storage.es.dao.InstanceEsCacheDAO
org.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO
\ No newline at end of file
org.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO
org.skywalking.apm.collector.storage.es.dao.ApplicationEsStreamDAO
org.skywalking.apm.collector.storage.es.dao.InstanceEsStreamDAO
org.skywalking.apm.collector.storage.es.dao.ServiceNameEsStreamDAO
org.skywalking.apm.collector.storage.es.dao.CpuMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.GCMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.NodeComponentEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.NodeMappingEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.SegmentCostEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsPersistenceDAO
org.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.InstPerformanceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.CpuMetricEsUIDAO
org.skywalking.apm.collector.storage.es.dao.GCMetricEsUIDAO
org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO
org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.NodeComponentEsUIDAO
org.skywalking.apm.collector.storage.es.dao.NodeMappingEsUIDAO
org.skywalking.apm.collector.storage.es.dao.NodeReferenceEsUIDAO
org.skywalking.apm.collector.storage.es.dao.SegmentCostEsUIDAO
org.skywalking.apm.collector.storage.es.dao.SegmentEsUIDAO
org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsUIDAO
\ No newline at end of file
org.skywalking.apm.collector.storage.h2.dao.CpuMetricH2DAO
org.skywalking.apm.collector.storage.h2.dao.GCMetricH2DAO
org.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2DAO
org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2DAO
org.skywalking.apm.collector.storage.h2.dao.MemoryPoolMetricH2DAO
org.skywalking.apm.collector.storage.h2.dao.ApplicationH2DAO
org.skywalking.apm.collector.storage.h2.dao.InstanceH2DAO
org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2DAO
org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.storage.h2.dao.InstPerformanceH2DAO
org.skywalking.apm.collector.storage.h2.dao.NodeComponentH2DAO
org.skywalking.apm.collector.storage.h2.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.storage.h2.dao.SegmentCostH2DAO
org.skywalking.apm.collector.storage.h2.dao.SegmentH2DAO
org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2DAO
org.skywalking.apm.collector.storage.h2.dao.ApplicationH2CacheDAO
org.skywalking.apm.collector.storage.h2.dao.InstanceH2CacheDAO
org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2CacheDAO
\ No newline at end of file
org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2CacheDAO
org.skywalking.apm.collector.storage.h2.dao.ApplicationH2StreamDAO
org.skywalking.apm.collector.storage.h2.dao.InstanceH2StreamDAO
org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2StreamDAO
org.skywalking.apm.collector.storage.h2.dao.CpuMetricH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.GCMetricH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.MemoryPoolMetricH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.InstPerformanceH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.NodeComponentH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.NodeMappingH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.NodeReferenceH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.SegmentCostH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.SegmentH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2PersistenceDAO
org.skywalking.apm.collector.storage.h2.dao.InstanceH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.InstPerformanceH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.CpuMetricH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.GCMetricH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.NodeComponentH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.NodeMappingH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.NodeReferenceH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.SegmentCostH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.SegmentH2UIDAO
org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2UIDAO
\ No newline at end of file
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.ui.jetty;
import java.util.Properties;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
......@@ -54,7 +55,6 @@ import org.skywalking.apm.collector.ui.jetty.handler.servicetree.EntryServiceGet
import org.skywalking.apm.collector.ui.jetty.handler.servicetree.ServiceTreeGetByIdHandler;
import org.skywalking.apm.collector.ui.jetty.handler.time.AllInstanceLastTimeGetHandler;
import org.skywalking.apm.collector.ui.jetty.handler.time.OneInstanceLastTimeGetHandler;
import org.skywalking.apm.collector.cache.CacheServiceManager;
/**
* @author peng-yongsheng
......@@ -97,7 +97,7 @@ public class UIModuleJettyProvider extends ModuleProvider {
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.getOrCreateIfAbsent(host, port, contextPath);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(daoService, jettyServer, cacheServiceManager);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
......@@ -109,7 +109,7 @@ public class UIModuleJettyProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, JettyManagerModule.NAME, NamingModule.NAME, CacheModule.NAME};
return new String[] {ClusterModule.NAME, JettyManagerModule.NAME, NamingModule.NAME, CacheModule.NAME, StorageModule.NAME};
}
private void addHandlers(DAOService daoService, Server jettyServer, CacheServiceManager cacheServiceManager) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册