提交 8922e974 编写于 作者: P peng-yongsheng

Application register test success.

上级 1cb05658
......@@ -20,12 +20,20 @@ package org.skywalking.apm.collector.agent.grpc;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.TraceSegmentServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener;
import org.skywalking.apm.collector.agent.stream.AgentStreamSingleton;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
......@@ -34,9 +42,12 @@ import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
* @author peng-yongsheng
......@@ -74,11 +85,17 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new AgentGRPCNamingHandler(namingListener));
CacheServiceManager cacheServiceManager = new CacheServiceManager();
cacheServiceManager.init(getManager());
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);
addHandlers(daoService, gRPCServer);
AgentStreamSingleton agentStreamSingleton = AgentStreamSingleton.getInstance(getManager(), cacheServiceManager, new WorkerCreateListener());
addHandlers(daoService, gRPCServer, cacheServiceManager, agentStreamSingleton);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
}
......@@ -89,10 +106,16 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME};
}
private void addHandlers(DAOService daoService, Server gRPCServer) {
private void addHandlers(DAOService daoService, Server gRPCServer, CacheServiceManager cacheServiceManager,
AgentStreamSingleton agentStreamSingleton) {
Graph<Application> applicationRegisterGraph = agentStreamSingleton.getApplicationRegisterGraph();
gRPCServer.addHandler(new ApplicationRegisterServiceHandler(cacheServiceManager, applicationRegisterGraph));
gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(daoService, cacheServiceManager));
gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(cacheServiceManager));
gRPCServer.addHandler(new JVMMetricsServiceHandler());
gRPCServer.addHandler(new TraceSegmentServiceHandler());
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithIntegerValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterServiceHandler extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);
private final ApplicationIDService applicationIDService;
public ApplicationRegisterServiceHandler(CacheServiceManager cacheServiceManager,
Graph<org.skywalking.apm.collector.storage.table.register.Application> applicationRegisterGraph) {
applicationIDService = new ApplicationIDService(cacheServiceManager, applicationRegisterGraph);
}
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
logger.debug("register application");
ProtocolStringList applicationCodes = request.getApplicationCodeList();
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = applicationIDService.getOrCreate(applicationCode);
if (applicationId != 0) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
builder.addApplication(value);
}
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.OSInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private final InstanceIDService instanceIDService;
public InstanceDiscoveryServiceHandler(DAOService daoService, CacheServiceManager cacheServiceManager) {
this.instanceIDService = new InstanceIDService(daoService, cacheServiceManager);
}
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private String buildOsInfo(OSInfo osinfo) {
JsonObject osInfoJson = new JsonObject();
osInfoJson.addProperty("osName", osinfo.getOsName());
osInfoJson.addProperty("hostName", osinfo.getHostname());
osInfoJson.addProperty("processId", osinfo.getProcessNo());
JsonArray ipv4Array = new JsonArray();
osinfo.getIpv4SList().forEach(ipv4 -> {
ipv4Array.add(ipv4);
});
osInfoJson.add("ipv4s", ipv4Array);
return osInfoJson.toString();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.ServiceNameMappingElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final ServiceNameService serviceNameService;
public ServiceNameDiscoveryServiceHandler(CacheServiceManager cacheServiceManager) {
this.serviceNameService = new ServiceNameService(cacheServiceManager);
}
@Override public void discovery(ServiceNameCollection request,
StreamObserver<ServiceNameMappingCollection> responseObserver) {
List<ServiceNameElement> serviceNameElementList = request.getElementsList();
ServiceNameMappingCollection.Builder builder = ServiceNameMappingCollection.newBuilder();
for (ServiceNameElement serviceNameElement : serviceNameElementList) {
int applicationId = serviceNameElement.getApplicationId();
String serviceName = serviceNameElement.getServiceName();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
if (serviceId != 0) {
ServiceNameMappingElement.Builder mappingElement = ServiceNameMappingElement.newBuilder();
mappingElement.setServiceId(serviceId);
mappingElement.setElement(serviceNameElement);
builder.addElements(mappingElement);
}
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
......@@ -37,7 +37,8 @@ public class AgentGRPCNamingHandler extends JettyHandler {
}
@Override public String pathSpec() {
return "/agent/gRPC";
// return "/agent/gRPC";
return "/agentstream/grpc";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.Test;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterServiceHandlerTestCase {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandlerTestCase.class);
private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub;
@Test
public void testRegister() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode("test141").build();
ApplicationMapping mapping = stub.register(application);
logger.debug(mapping.getApplication(0).getKey() + ", " + mapping.getApplication(0).getValue());
}
}
......@@ -23,6 +23,7 @@ import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -91,7 +92,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME};
}
private void addHandlers(DAOService daoService, Server jettyServer) {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
* @author peng-yongsheng
*/
public class AgentStreamSingleton {
private static AgentStreamSingleton INSTANCE;
private final ModuleManager moduleManager;
private final CacheServiceManager cacheServiceManager;
private final WorkerCreateListener workerCreateListener;
private Graph<Application> applicationRegisterGraph;
public AgentStreamSingleton(ModuleManager moduleManager, CacheServiceManager cacheServiceManager,
WorkerCreateListener workerCreateListener) throws ServiceNotProvidedException, ModuleNotFoundException {
this.moduleManager = moduleManager;
this.cacheServiceManager = cacheServiceManager;
this.workerCreateListener = workerCreateListener;
createJVMGraph();
createRegisterGraph();
createTraceGraph();
}
public static synchronized AgentStreamSingleton getInstance(ModuleManager moduleManager,
CacheServiceManager cacheServiceManager,
WorkerCreateListener workerCreateListener) throws ServiceNotProvidedException, ModuleNotFoundException {
if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager, cacheServiceManager, workerCreateListener);
}
return INSTANCE;
}
private void createJVMGraph() {
}
private void createRegisterGraph() throws ServiceNotProvidedException, ModuleNotFoundException {
RegisterStreamGraph registerStreamGraph = new RegisterStreamGraph(moduleManager, cacheServiceManager, workerCreateListener);
applicationRegisterGraph = registerStreamGraph.createApplicationRegisterGraph();
}
public Graph<Application> getApplicationRegisterGraph() {
return applicationRegisterGraph;
}
private void createTraceGraph() {
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationRegisterSerialWorker;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
* @author peng-yongsheng
*/
public class RegisterStreamGraph {
public static final int APPLICATION_REGISTER_GRAPH_ID = 200;
private final ModuleManager moduleManager;
private final CacheServiceManager cacheServiceManager;
private final WorkerCreateListener workerCreateListener;
public RegisterStreamGraph(ModuleManager moduleManager,
CacheServiceManager cacheServiceManager,
WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.cacheServiceManager = cacheServiceManager;
this.workerCreateListener = workerCreateListener;
}
public Graph<Application> createApplicationRegisterGraph() throws ModuleNotFoundException, ServiceNotProvidedException {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Graph<Application> graph = GraphManager.INSTANCE.createIfAbsent(APPLICATION_REGISTER_GRAPH_ID, Application.class);
graph.addNode(new ApplicationRegisterRemoteWorker.Factory(daoService, cacheServiceManager, remoteSenderService, APPLICATION_REGISTER_GRAPH_ID).create(workerCreateListener))
.addNext(new ApplicationRegisterSerialWorker.Factory(daoService, cacheServiceManager, queueCreatorService).create(workerCreateListener));
return graph;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
private final CacheServiceManager cacheServiceManager;
private final Graph<Application> applicationRegisterGraph;
public ApplicationIDService(CacheServiceManager cacheServiceManager, Graph<Application> applicationRegisterGraph) {
this.cacheServiceManager = cacheServiceManager;
this.applicationRegisterGraph = applicationRegisterGraph;
}
public int getOrCreate(String applicationCode) {
int applicationId = cacheServiceManager.getApplicationCacheService().get(applicationCode);
if (applicationId == 0) {
Application application = new Application(applicationCode);
application.setApplicationCode(applicationCode);
application.setApplicationId(0);
applicationRegisterGraph.start(application);
}
return applicationId;
}
}
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -40,7 +41,7 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker<Applic
}
@Override public int id() {
return 0;
return ApplicationRegisterRemoteWorker.class.hashCode();
}
@Override protected void onWork(Application message) throws WorkerException {
......@@ -48,11 +49,15 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker<Applic
onNext(message);
}
@Override public Selector selector() {
return Selector.ForeverFirst;
}
public static class Factory extends AbstractRemoteWorkerProvider<Application, Application, ApplicationRegisterRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override public ApplicationRegisterRemoteWorker workerInstance(DAOService daoService,
......
......@@ -43,7 +43,7 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Ap
}
@Override public int id() {
return 0;
return ApplicationRegisterSerialWorker.class.hashCode();
}
@Override protected void onWork(Application application) throws WorkerException {
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.storage.dao.IInstanceStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
private final DAOService daoService;
private final CacheServiceManager cacheServiceManager;
public InstanceIDService(DAOService daoService, CacheServiceManager cacheServiceManager) {
this.daoService = daoService;
this.cacheServiceManager = cacheServiceManager;
}
public int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo) {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
int instanceId = cacheServiceManager.getInstanceCacheService().getInstanceId(applicationId, agentUUID);
if (instanceId == 0) {
Instance instance = new Instance("0");
instance.setApplicationId(applicationId);
instance.setAgentUUID(agentUUID);
instance.setRegisterTime(registerTime);
instance.setHeartBeatTime(registerTime);
instance.setInstanceId(0);
instance.setOsInfo(osInfo);
}
return instanceId;
}
public void recover(int instanceId, int applicationId, long registerTime, String osInfo) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceStreamDAO dao = (IInstanceStreamDAO)daoService.get(IInstanceStreamDAO.class);
Instance instance = new Instance(String.valueOf(instanceId));
instance.setApplicationId(applicationId);
instance.setAgentUUID("");
instance.setRegisterTime(registerTime);
instance.setHeartBeatTime(registerTime);
instance.setInstanceId(instanceId);
instance.setOsInfo(osInfo);
dao.save(instance);
}
}
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -36,7 +37,7 @@ public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker<Instance,
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterRemoteWorker.class);
@Override public int id() {
return 0;
return InstanceRegisterRemoteWorker.class.hashCode();
}
public InstanceRegisterRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
......@@ -48,11 +49,15 @@ public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker<Instance,
onNext(instance);
}
@Override public Selector selector() {
return Selector.ForeverFirst;
}
public static class Factory extends AbstractRemoteWorkerProvider<Instance, Instance, InstanceRegisterRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override
......
......@@ -41,7 +41,7 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker<Insta
}
@Override public int id() {
return 0;
return InstanceRegisterSerialWorker.class.hashCode();
}
@Override protected void onWork(Instance instance) throws WorkerException {
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -40,18 +41,22 @@ public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker<Servic
}
@Override public int id() {
return 0;
return ServiceNameRegisterRemoteWorker.class.hashCode();
}
@Override protected void onWork(ServiceName serviceName) throws WorkerException {
onNext(serviceName);
}
@Override public Selector selector() {
return Selector.ForeverFirst;
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceName, ServiceName, ServiceNameRegisterRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override public ServiceNameRegisterRemoteWorker workerInstance(DAOService daoService,
......
......@@ -43,7 +43,7 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
}
@Override public int id() {
return 0;
return ServiceNameRegisterSerialWorker.class.hashCode();
}
@Override protected void onWork(ServiceName serviceName) throws WorkerException {
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -36,17 +37,21 @@ public class NodeComponentRemoteWorker extends AbstractRemoteWorker<NodeComponen
}
@Override public int id() {
return 0;
return NodeComponentRemoteWorker.class.hashCode();
}
@Override protected void onWork(NodeComponent nodeComponent) throws WorkerException {
onNext(nodeComponent);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeComponent, NodeComponent, NodeComponentRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -36,17 +37,21 @@ public class NodeMappingRemoteWorker extends AbstractRemoteWorker<NodeMapping, N
}
@Override public int id() {
return 0;
return NodeMappingRemoteWorker.class.hashCode();
}
@Override protected void onWork(NodeMapping nodeMapping) throws WorkerException {
onNext(nodeMapping);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeMapping, NodeMapping, NodeMappingRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.noderef;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -36,17 +37,21 @@ public class NodeReferenceRemoteWorker extends AbstractRemoteWorker<NodeReferenc
}
@Override public int id() {
return 0;
return NodeReferenceRemoteWorker.class.hashCode();
}
@Override protected void onWork(NodeReference nodeReference) throws WorkerException {
onNext(nodeReference);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeReference, NodeReference, NodeReferenceRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -36,17 +37,21 @@ public class ServiceEntryRemoteWorker extends AbstractRemoteWorker<ServiceEntry,
}
@Override public int id() {
return 0;
return ServiceEntryRemoteWorker.class.hashCode();
}
@Override protected void onWork(ServiceEntry serviceEntry) throws WorkerException {
onNext(serviceEntry);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceEntry, ServiceEntry, ServiceEntryRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
......@@ -36,17 +37,21 @@ public class ServiceReferenceRemoteWorker extends AbstractRemoteWorker<ServiceRe
}
@Override public int id() {
return 0;
return ServiceReferenceRemoteWorker.class.hashCode();
}
@Override protected void onWork(ServiceReference serviceReference) throws WorkerException {
onNext(serviceReference);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceReference, ServiceReference, ServiceReferenceRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager, remoteSenderService, graphId);
}
@Override
......
......@@ -22,6 +22,9 @@ import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
/**
* @author peng-yongsheng
......@@ -37,33 +40,22 @@ public class CacheServiceManager {
return applicationCacheService;
}
public void setApplicationCacheService(
ApplicationCacheService applicationCacheService) {
this.applicationCacheService = applicationCacheService;
}
public InstanceCacheService getInstanceCacheService() {
return instanceCacheService;
}
public void setInstanceCacheService(InstanceCacheService instanceCacheService) {
this.instanceCacheService = instanceCacheService;
}
public ServiceIdCacheService getServiceIdCacheService() {
return serviceIdCacheService;
}
public void setServiceIdCacheService(ServiceIdCacheService serviceIdCacheService) {
this.serviceIdCacheService = serviceIdCacheService;
}
public ServiceNameCacheService getServiceNameCacheService() {
return serviceNameCacheService;
}
public void setServiceNameCacheService(
ServiceNameCacheService serviceNameCacheService) {
this.serviceNameCacheService = serviceNameCacheService;
public void init(ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException {
this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
this.instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
this.serviceIdCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class);
this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
}
}
......@@ -38,8 +38,8 @@ public abstract class Data extends EndOfBatchQueueMessage {
public Data(String id, Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
Column[] integerColumns, Column[] booleanColumns, Column[] byteColumns) {
super(id);
this.dataStrings[0] = id;
this.dataStrings = new String[stringColumns.length];
this.dataStrings[0] = id;
this.dataLongs = new Long[longColumns.length];
this.dataDoubles = new Double[doubleColumns.length];
this.dataIntegers = new Integer[integerColumns.length];
......
......@@ -4,6 +4,12 @@ cluster:
user_name: sa
cache:
guava:
queue:
disruptor:
remote:
gRPC:
host: localhost
port: 11800
naming:
jetty:
host: localhost
......
......@@ -20,13 +20,21 @@ package org.skywalking.apm.collector.queue.disruptor.service;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.queue.disruptor.base.DisruptorQueueCreator;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class DisruptorQueueCreatorService implements QueueCreatorService {
private final DisruptorQueueCreator creator;
public DisruptorQueueCreatorService() {
this.creator = new DisruptorQueueCreator();
}
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
return null;
return creator.create(queueSize, executor);
}
}
......@@ -20,7 +20,6 @@ package org.skywalking.apm.collector.remote;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.RemoteSerializeService;
import org.skywalking.apm.collector.remote.service.RemoteServerService;
/**
......@@ -35,6 +34,6 @@ public class RemoteModule extends Module {
}
@Override public Class[] services() {
return new Class[] {RemoteServerService.class, RemoteSerializeService.class, RemoteSenderService.class};
return new Class[] {RemoteServerService.class, RemoteSenderService.class};
}
}
......@@ -23,6 +23,10 @@ import org.skywalking.apm.collector.core.data.Data;
/**
* @author peng-yongsheng
*/
public interface RemoteClient {
public interface RemoteClient extends Comparable<RemoteClient> {
String getAddress();
void send(int graphId, int nodeId, Data data);
boolean equals(String address);
}
......@@ -18,9 +18,11 @@
package org.skywalking.apm.collector.remote.service;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface RemoteClientService {
public interface RemoteClientService extends Service {
RemoteClient create(String host, int port);
}
......@@ -25,5 +25,9 @@ import org.skywalking.apm.collector.core.module.Service;
* @author peng-yongsheng
*/
public interface RemoteSenderService extends Service {
void send(int graph, int nodeId, Data data);
Mode send(int graphId, int nodeId, Data data, Selector selector);
enum Mode {
Remote, Local
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.service;
/**
* @author peng-yongsheng
*/
public enum Selector {
HashCode, Rolling, ForeverFirst
}
......@@ -39,7 +39,10 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
remoteSenderService = new GRPCRemoteSenderService();
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
remoteSenderService = new GRPCRemoteSenderService(host, port);
this.registerServiceImplementation(RemoteServerService.class, new GRPCRemoteServerService(listener));
this.registerServiceImplementation(RemoteSenderService.class, remoteSenderService);
}
......
......@@ -30,12 +30,18 @@ public class GRPCRemoteClient implements RemoteClient {
private final GRPCRemoteSerializeService service;
private final StreamObserver<RemoteMessage> streamObserver;
private final String address;
public GRPCRemoteClient(StreamObserver<RemoteMessage> streamObserver) {
public GRPCRemoteClient(String host, int port, StreamObserver<RemoteMessage> streamObserver) {
this.address = host + ":" + String.valueOf(port);
this.streamObserver = streamObserver;
this.service = new GRPCRemoteSerializeService();
}
@Override public final String getAddress() {
return this.address;
}
@Override public void send(int graphId, int nodeId, Data data) {
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setGraphId(graphId);
......@@ -44,4 +50,12 @@ public class GRPCRemoteClient implements RemoteClient {
streamObserver.onNext(builder.build());
}
@Override public boolean equals(String address) {
return this.address.equals(address);
}
@Override public int compareTo(RemoteClient o) {
return this.address.compareTo(o.getAddress());
}
}
......@@ -45,7 +45,7 @@ public class GRPCRemoteClientService implements RemoteClientService {
}
RemoteCommonServiceGrpc.RemoteCommonServiceStub stub = RemoteCommonServiceGrpc.newStub(client.getChannel());
StreamObserver<RemoteMessage> streamObserver = createStreamObserver(stub);
return new GRPCRemoteClient(streamObserver);
return new GRPCRemoteClient(host, port, streamObserver);
}
private StreamObserver<RemoteMessage> createStreamObserver(RemoteCommonServiceGrpc.RemoteCommonServiceStub stub) {
......
......@@ -18,14 +18,20 @@
package org.skywalking.apm.collector.remote.grpc.service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider;
import org.skywalking.apm.collector.remote.grpc.service.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.remote.grpc.service.selector.HashCodeSelector;
import org.skywalking.apm.collector.remote.grpc.service.selector.RollingSelector;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
/**
* @author peng-yongsheng
......@@ -34,33 +40,75 @@ public class GRPCRemoteSenderService extends ClusterModuleListener implements Re
public static final String PATH = "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME;
private final GRPCRemoteClientService service;
private final Map<String, RemoteClient> clientMap;
private List<RemoteClient> remoteClients;
private final String selfAddress;
private final HashCodeSelector hashCodeSelector;
private final ForeverFirstSelector foreverFirstSelector;
private final RollingSelector rollingSelector;
@Override public void send(int graph, int nodeId, Data data) {
@Override public Mode send(int graphId, int nodeId, Data data, Selector selector) {
RemoteClient remoteClient;
switch (selector) {
case HashCode:
remoteClient = hashCodeSelector.select(remoteClients, data);
return sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data);
case Rolling:
remoteClient = rollingSelector.select(remoteClients, data);
return sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data);
case ForeverFirst:
remoteClient = foreverFirstSelector.select(remoteClients, data);
return sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data);
}
throw new UnexpectedException("Selector not match, Just support hash, rolling, forever first selector.");
}
private Mode sendToRemoteWhenNotSelf(RemoteClient remoteClient, int graphId, int nodeId, Data data) {
if (remoteClient.equals(selfAddress)) {
return Mode.Local;
} else {
remoteClient.send(graphId, nodeId, data);
return Mode.Remote;
}
}
public GRPCRemoteSenderService() {
public GRPCRemoteSenderService(String host, int port) {
this.service = new GRPCRemoteClientService();
this.clientMap = new ConcurrentHashMap<>();
this.remoteClients = new ArrayList<>();
this.selfAddress = host + ":" + String.valueOf(port);
this.hashCodeSelector = new HashCodeSelector();
this.foreverFirstSelector = new ForeverFirstSelector();
this.rollingSelector = new RollingSelector();
}
@Override public String path() {
return PATH;
}
@Override public void serverJoinNotify(String serverAddress) {
if (!clientMap.containsKey(serverAddress)) {
String host = serverAddress.split(":")[0];
int port = Integer.parseInt(serverAddress.split(":")[1]);
RemoteClient remoteClient = service.create(host, port);
clientMap.put(serverAddress, remoteClient);
}
@Override public synchronized void serverJoinNotify(String serverAddress) {
List<RemoteClient> newRemoteClients = new ArrayList<>();
newRemoteClients.addAll(remoteClients);
String host = serverAddress.split(":")[0];
int port = Integer.parseInt(serverAddress.split(":")[1]);
RemoteClient remoteClient = service.create(host, port);
newRemoteClients.add(remoteClient);
Collections.sort(newRemoteClients);
this.remoteClients = newRemoteClients;
}
@Override public void serverQuitNotify(String serverAddress) {
if (clientMap.containsKey(serverAddress)) {
clientMap.remove(serverAddress);
@Override public synchronized void serverQuitNotify(String serverAddress) {
List<RemoteClient> newRemoteClients = new ArrayList<>();
newRemoteClients.addAll(remoteClients);
for (int i = newRemoteClients.size() - 1; i >= 0; i--) {
RemoteClient remoteClient = newRemoteClients.get(i);
if (remoteClient.equals(serverAddress)) {
newRemoteClients.remove(i);
}
}
this.remoteClients = newRemoteClients;
}
}
......@@ -19,6 +19,7 @@
package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -30,7 +31,7 @@ public class ForeverFirstSelector implements RemoteClientSelector {
private final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
@Override public RemoteClient select(List<RemoteClient> clients, Object message) {
@Override public RemoteClient select(List<RemoteClient> clients, Data message) {
logger.debug("clients size: {}", clients.size());
return clients.get(0);
}
......
......@@ -19,7 +19,7 @@
package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List;
import org.skywalking.apm.collector.core.data.AbstractHashMessage;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClient;
/**
......@@ -27,14 +27,9 @@ import org.skywalking.apm.collector.remote.service.RemoteClient;
*/
public class HashCodeSelector implements RemoteClientSelector {
@Override public RemoteClient select(List<RemoteClient> clients, Object message) {
if (message instanceof AbstractHashMessage) {
AbstractHashMessage hashMessage = (AbstractHashMessage)message;
int size = clients.size();
int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return clients.get(selectIndex);
} else {
throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage, the message object class is: " + message.getClass().getName());
}
@Override public RemoteClient select(List<RemoteClient> clients, Data message) {
int size = clients.size();
int selectIndex = Math.abs(message.getHashCode()) % size;
return clients.get(selectIndex);
}
}
......@@ -19,11 +19,12 @@
package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClient;
/**
* @author peng-yongsheng
*/
public interface RemoteClientSelector {
RemoteClient select(List<RemoteClient> clients, Object message);
RemoteClient select(List<RemoteClient> clients, Data message);
}
......@@ -19,6 +19,7 @@
package org.skywalking.apm.collector.remote.grpc.service.selector;
import java.util.List;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClient;
/**
......@@ -28,7 +29,7 @@ public class RollingSelector implements RemoteClientSelector {
private int index = 0;
@Override public RemoteClient select(List<RemoteClient> clients, Object message) {
@Override public RemoteClient select(List<RemoteClient> clients, Data message) {
int size = clients.size();
index++;
int selectIndex = Math.abs(index) % size;
......
......@@ -40,7 +40,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT, OUTPUT, WORKER_TYP
}
@Override
public final WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException {
public final WorkerRef create(WorkerCreateListener workerCreateListener) {
WORKER_TYPE localAsyncWorker = workerInstance(getDaoService(), getCacheServiceManager());
workerCreateListener.addWorker(localAsyncWorker);
QueueEventHandler<INPUT> queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
......
......@@ -20,6 +20,7 @@ package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
......@@ -37,13 +38,15 @@ public abstract class AbstractRemoteWorker<INPUT extends Data, OUTPUT extends Da
super(daoService, cacheServiceManager);
}
public abstract Selector selector();
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(INPUT)}
* @throws Exception The Exception happen in {@link #onWork(Object)} )}
*/
final public void allocateJob(INPUT message) {
public final void allocateJob(INPUT message) {
try {
onWork(message);
} catch (WorkerException e) {
......
......@@ -20,7 +20,7 @@ package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
......@@ -33,12 +33,14 @@ import org.skywalking.apm.collector.storage.service.DAOService;
*/
public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT extends Data, WORKER_TYPE extends AbstractRemoteWorker<INPUT, OUTPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE> {
private final RemoteClientService remoteClientService;
private final RemoteSenderService remoteSenderService;
private final int graphId;
public AbstractRemoteWorkerProvider(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
RemoteSenderService remoteSenderService, int graphId) {
super(daoService, cacheServiceManager);
this.remoteClientService = remoteClientService;
this.remoteSenderService = remoteSenderService;
this.graphId = graphId;
}
/**
......@@ -48,15 +50,9 @@ public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT ex
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override final public WorkerRef create(WorkerCreateListener workerCreateListener) {
@Override final public RemoteWorkerRef create(WorkerCreateListener workerCreateListener) {
WORKER_TYPE remoteWorker = workerInstance(getDaoService(), getCacheServiceManager());
workerCreateListener.addWorker(remoteWorker);
RemoteWorkerRef<INPUT, OUTPUT> workerRef = new RemoteWorkerRef<>(remoteWorker);
return workerRef;
}
public final RemoteWorkerRef create(String host, int port) {
RemoteWorkerRef<INPUT, OUTPUT> workerRef = new RemoteWorkerRef<>(null, remoteClientService.create(host, port));
return workerRef;
return new RemoteWorkerRef<>(remoteWorker, remoteSenderService, graphId);
}
}
......@@ -19,8 +19,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.remote.service.RemoteClient;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -31,45 +30,30 @@ public class RemoteWorkerRef<INPUT extends Data, OUTPUT extends Data> extends Wo
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
private final Boolean acrossJVM;
private final AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker;
private final RemoteClient remoteClient;
private final RemoteSenderService remoteSenderService;
private final int graphId;
public RemoteWorkerRef(AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker) {
public RemoteWorkerRef(AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker, RemoteSenderService remoteSenderService,
int graphId) {
super(remoteWorker);
this.remoteWorker = remoteWorker;
this.acrossJVM = false;
this.remoteClient = null;
}
public RemoteWorkerRef(AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker, RemoteClient remoteClient) {
super(remoteWorker);
this.remoteWorker = remoteWorker;
this.acrossJVM = true;
this.remoteClient = remoteClient;
this.remoteSenderService = remoteSenderService;
this.graphId = graphId;
}
@Override protected void in(INPUT message) {
if (acrossJVM) {
try {
GraphManager.INSTANCE.findGraph(1);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
try {
RemoteSenderService.Mode mode = remoteSenderService.send(this.graphId, this.remoteWorker.id(), message, this.remoteWorker.selector());
if (mode.equals(RemoteSenderService.Mode.Local)) {
out(message);
}
} else {
remoteWorker.allocateJob(message);
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
@Override protected void out(INPUT input) {
super.out(input);
}
private Boolean isAcrossJVM() {
return acrossJVM;
}
@Override public String toString() {
return "acrossJVM: " + isAcrossJVM();
}
}
......@@ -21,10 +21,6 @@ package org.skywalking.apm.collector.ui.jetty;
import java.util.Properties;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -92,7 +88,8 @@ public class UIModuleJettyProvider extends ModuleProvider {
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new UIJettyNamingHandler(namingListener));
CacheServiceManager cacheServiceManager = initCacheServiceManager();
CacheServiceManager cacheServiceManager = new CacheServiceManager();
cacheServiceManager.init(getManager());
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
......@@ -127,18 +124,4 @@ public class UIModuleJettyProvider extends ModuleProvider {
jettyServer.addHandler(new TraceDagGetHandler(daoService, cacheServiceManager));
jettyServer.addHandler(new TraceStackGetHandler(daoService, cacheServiceManager));
}
private CacheServiceManager initCacheServiceManager() throws ModuleNotFoundException, ServiceNotProvidedException {
ApplicationCacheService applicationCacheService = getManager().find(CacheModule.NAME).getService(ApplicationCacheService.class);
InstanceCacheService instanceCacheService = getManager().find(CacheModule.NAME).getService(InstanceCacheService.class);
ServiceIdCacheService serviceIdCacheService = getManager().find(CacheModule.NAME).getService(ServiceIdCacheService.class);
ServiceNameCacheService serviceNameCacheService = getManager().find(CacheModule.NAME).getService(ServiceNameCacheService.class);
CacheServiceManager serviceManager = new CacheServiceManager();
serviceManager.setApplicationCacheService(applicationCacheService);
serviceManager.setInstanceCacheService(instanceCacheService);
serviceManager.setServiceIdCacheService(serviceIdCacheService);
serviceManager.setServiceNameCacheService(serviceNameCacheService);
return serviceManager;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册