提交 7d73fba8 编写于 作者: P peng-yongsheng

Network address register by http json and grpc.

上级 3b6c6843
......@@ -23,6 +23,7 @@ import org.apache.skywalking.apm.collector.agent.grpc.define.AgentGRPCModule;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ApplicationRegisterServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.InstanceDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.JVMMetricsServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.NetworkAddressRegisterServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ServiceNameDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingHandler;
......@@ -95,5 +96,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new JVMMetricsServiceHandler(getManager()));
gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
gRPCServer.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.apache.skywalking.apm.network.proto.KeyWithIntegerValue;
import org.apache.skywalking.apm.network.proto.NetworkAddressMappings;
import org.apache.skywalking.apm.network.proto.NetworkAddressRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.NetworkAddresses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NetworkAddressRegisterServiceHandler extends NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(NetworkAddressRegisterServiceHandler.class);
private final INetworkAddressIDService networkAddressIDService;
public NetworkAddressRegisterServiceHandler(ModuleManager moduleManager) {
this.networkAddressIDService = moduleManager.find(AnalysisRegisterModule.NAME).getService(INetworkAddressIDService.class);
}
@Override
public void batchRegister(NetworkAddresses request, StreamObserver<NetworkAddressMappings> responseObserver) {
logger.debug("register application");
ProtocolStringList addressesList = request.getAddressesList();
NetworkAddressMappings.Builder builder = NetworkAddressMappings.newBuilder();
for (int i = 0; i < addressesList.size(); i++) {
String networkAddress = addressesList.get(i);
int addressId = networkAddressIDService.getOrCreate(networkAddress);
if (addressId != 0) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(networkAddress).setValue(addressId).build();
builder.addAddressIds(value);
}
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
......@@ -22,6 +22,7 @@ import java.util.Properties;
import org.apache.skywalking.apm.collector.agent.jetty.define.AgentJettyModule;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ApplicationRegisterServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.InstanceDiscoveryServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.NetworkAddressRegisterServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ServiceNameDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.TraceSegmentServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.naming.AgentJettyNamingHandler;
......@@ -93,5 +94,6 @@ public class AgentModuleJettyProvider extends ModuleProvider {
jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager()));
jettyServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
jettyServer.addHandler(new NetworkAddressRegisterServletHandler(getManager()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.agent.jetty.provider.handler;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NetworkAddressRegisterServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(NetworkAddressRegisterServletHandler.class);
private final INetworkAddressIDService networkAddressIDService;
private Gson gson = new Gson();
private static final String NETWORK_ADDRESS = "n";
private static final String ADDRESS_ID = "i";
public NetworkAddressRegisterServletHandler(ModuleManager moduleManager) {
this.networkAddressIDService = moduleManager.find(AnalysisRegisterModule.NAME).getService(INetworkAddressIDService.class);
}
@Override public String pathSpec() {
return "/networkAddress/register";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}
@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray networkAddresses = gson.fromJson(req.getReader(), JsonArray.class);
for (int i = 0; i < networkAddresses.size(); i++) {
String networkAddress = networkAddresses.get(i).getAsString();
logger.debug("network address register, network address: {}", networkAddress);
int addressId = networkAddressIDService.getOrCreate(networkAddress);
JsonObject mapping = new JsonObject();
mapping.addProperty(ADDRESS_ID, addressId);
mapping.addProperty(NETWORK_ADDRESS, networkAddress);
responseArray.add(mapping);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.analysis.register.define;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
import org.apache.skywalking.apm.collector.core.module.Module;
......@@ -35,6 +36,6 @@ public class AnalysisRegisterModule extends Module {
}
@Override public Class[] services() {
return new Class[] {IApplicationIDService.class, IInstanceIDService.class, IServiceNameService.class};
return new Class[] {IApplicationIDService.class, IInstanceIDService.class, IServiceNameService.class, INetworkAddressIDService.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.register.define.service;
import org.apache.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface INetworkAddressIDService extends Service {
int getOrCreate(String networkAddress);
}
......@@ -22,6 +22,7 @@ import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
import org.apache.skywalking.apm.collector.analysis.register.provider.register.ApplicationRegisterGraph;
import org.apache.skywalking.apm.collector.analysis.register.provider.register.InstanceRegisterGraph;
......@@ -29,6 +30,7 @@ import org.apache.skywalking.apm.collector.analysis.register.provider.register.N
import org.apache.skywalking.apm.collector.analysis.register.provider.register.ServiceNameRegisterGraph;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.ApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.InstanceIDService;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.NetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.ServiceNameService;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
......@@ -58,6 +60,7 @@ public class AnalysisRegisterModuleProvider extends ModuleProvider {
this.registerServiceImplementation(IApplicationIDService.class, new ApplicationIDService(getManager()));
this.registerServiceImplementation(IInstanceIDService.class, new InstanceIDService(getManager()));
this.registerServiceImplementation(IServiceNameService.class, new ServiceNameService(getManager()));
this.registerServiceImplementation(INetworkAddressIDService.class, new NetworkAddressIDService(getManager()));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.register.provider.service;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheService;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NetworkAddressIDService implements INetworkAddressIDService {
private final Logger logger = LoggerFactory.getLogger(NetworkAddressIDService.class);
private final ModuleManager moduleManager;
private NetworkAddressCacheService networkAddressCacheService;
private Graph<NetworkAddress> networkAddressGraph;
public NetworkAddressIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
private NetworkAddressCacheService getNetworkAddressCacheService() {
if (ObjectUtils.isEmpty(networkAddressCacheService)) {
networkAddressCacheService = moduleManager.find(CacheModule.NAME).getService(NetworkAddressCacheService.class);
}
return networkAddressCacheService;
}
private Graph<NetworkAddress> getNetworkAddressGraph() {
if (ObjectUtils.isEmpty(networkAddressGraph)) {
this.networkAddressGraph = GraphManager.INSTANCE.findGraph(GraphIdDefine.NETWORK_ADDRESS_NAME_REGISTER_GRAPH_ID, NetworkAddress.class);
}
return networkAddressGraph;
}
@Override public int getOrCreate(String networkAddress) {
int addressId = getNetworkAddressCacheService().getAddressId(networkAddress);
if (addressId == 0) {
NetworkAddress newNetworkAddress = new NetworkAddress("0");
newNetworkAddress.setNetworkAddress(networkAddress);
newNetworkAddress.setAddressId(0);
getNetworkAddressGraph().start(newNetworkAddress);
}
return addressId;
}
}
......@@ -58,6 +58,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IMemoryPoolMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.INetworkAddressCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.INetworkAddressRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentCostUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
......@@ -109,6 +110,7 @@ public class StorageModule extends Module {
classes.add(IApplicationRegisterDAO.class);
classes.add(IInstanceRegisterDAO.class);
classes.add(IServiceNameRegisterDAO.class);
classes.add(INetworkAddressRegisterDAO.class);
}
private void addPersistenceDAO(List<Class> classes) {
......
......@@ -220,6 +220,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
}
private void registerRegisterDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(INetworkAddressRegisterDAO.class, new NetworkAddressRegisterEsDAO(elasticSearchClient));
this.registerServiceImplementation(IApplicationRegisterDAO.class, new ApplicationEsRegisterDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceRegisterDAO.class, new InstanceEsRegisterDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceNameRegisterDAO.class, new ServiceNameEsRegisterDAO(elasticSearchClient));
......@@ -249,8 +250,6 @@ public class StorageModuleEsProvider extends ModuleProvider {
}
private void registerUiDAO() throws ServiceNotProvidedException {
this.registerServiceImplementation(INetworkAddressRegisterDAO.class, new NetworkAddressRegisterEsDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceUIDAO.class, new InstanceEsUIDAO(elasticSearchClient));
this.registerServiceImplementation(ICpuMetricUIDAO.class, new CpuMetricEsUIDAO(elasticSearchClient));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册