提交 c1a47d4d 编写于 作者: P pengys5

service name register finish

上级 54fe8e43
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentregister.servicename.ServiceNameService;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameElement;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
import org.skywalking.apm.network.proto.ServiceNameMappingElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private ServiceNameService serviceNameService = new ServiceNameService();
@Override public void discovery(ServiceNameCollection request,
StreamObserver<ServiceNameMappingCollection> responseObserver) {
super.discovery(request, responseObserver);
List<ServiceNameElement> serviceNameElementList = request.getElementsList();
ServiceNameMappingCollection.Builder builder = ServiceNameMappingCollection.newBuilder();
for (ServiceNameElement serviceNameElement : serviceNameElementList) {
int applicationId = serviceNameElement.getApplicationId();
String serviceName = serviceNameElement.getServiceName();
int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
ServiceNameMappingElement.Builder mappingElement = ServiceNameMappingElement.newBuilder();
mappingElement.setServiceId(serviceId);
mappingElement.setElement(serviceNameElement);
builder.addElements(mappingElement);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
}
package org.skywalking.apm.collector.agentregister.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameService {
private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class);
public int getOrCreate(int applicationId, String serviceName) {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
int serviceId = dao.getServiceId(applicationId, serviceName);
if (serviceId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
ServiceNameDataDefine.ServiceName service = new ServiceNameDataDefine.ServiceName("0", serviceName, applicationId, 0);
try {
context.getClusterWorkerContext().lookup(ServiceNameRegisterRemoteWorker.WorkerRole.INSTANCE).tell(service);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
return serviceId;
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class ServiceNameDataDefine extends DataDefine {
public static final int DEFINE_ID = 103;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 4;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ServiceNameTable.COLUMN_SERVICE_NAME, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ServiceNameTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(3, new Attribute(ServiceNameTable.COLUMN_SERVICE_ID, AttributeType.INTEGER, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String serviceName = remoteData.getDataStrings(1);
int applicationId = remoteData.getDataIntegers(0);
int serviceId = remoteData.getDataIntegers(1);
return new ServiceName(id, serviceName, applicationId, serviceId);
}
@Override public RemoteData serialize(Object object) {
ServiceName serviceName = (ServiceName)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(serviceName.getId());
builder.addDataStrings(serviceName.getServiceName());
builder.addDataIntegers(serviceName.getApplicationId());
builder.addDataIntegers(serviceName.getServiceId());
return builder.build();
}
public static class ServiceName {
private String id;
private String serviceName;
private int applicationId;
private int serviceId;
public ServiceName(String id, String serviceName, int applicationId, int serviceId) {
this.id = id;
this.serviceName = serviceName;
this.applicationId = applicationId;
this.serviceId = serviceId;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
public int getApplicationId() {
return applicationId;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
public int getServiceId() {
return serviceId;
}
public void setServiceId(int serviceId) {
this.serviceId = serviceId;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class ServiceNameEsTableDefine extends ElasticSearchTableDefine {
public ServiceNameEsTableDefine() {
super(ServiceNameTable.TABLE);
}
@Override public int refreshInterval() {
return 0;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class ServiceNameH2TableDefine extends H2TableDefine {
public ServiceNameH2TableDefine() {
super(ServiceNameTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker {
private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterRemoteWorker.class);
protected ServiceNameRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
ServiceNameDataDefine.ServiceName serviceName = (ServiceNameDataDefine.ServiceName)message;
logger.debug("service name: {}", serviceName.getServiceName());
getClusterContext().lookup(ServiceNameRegisterSerialWorker.WorkerRole.INSTANCE).tell(serviceName);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceNameRegisterRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceNameRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceNameRegisterRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceNameRegisterRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceNameDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterSerialWorker.class);
public ServiceNameRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onWork(Object message) throws WorkerException {
if (message instanceof ServiceNameDataDefine.ServiceName) {
ServiceNameDataDefine.ServiceName serviceName = (ServiceNameDataDefine.ServiceName)message;
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
int min = dao.getMinServiceId();
if (min == 0) {
serviceName.setServiceId(1);
serviceName.setId("1");
} else {
int max = dao.getMaxServiceId();
int serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
serviceName.setApplicationId(serviceId);
serviceName.setId(String.valueOf(serviceId));
}
dao.save(serviceName);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceNameRegisterSerialWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceNameRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceNameRegisterSerialWorker(role(), clusterContext);
}
@Override public int queueSize() {
return 256;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceNameRegisterSerialWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceNameDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
/**
* @author pengys5
*/
public class ServiceNameTable {
public static final String TABLE = "service_name";
public static final String COLUMN_SERVICE_NAME = "service_name";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_SERVICE_ID = "service_id";
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
/**
* @author pengys5
*/
public interface IServiceNameDAO {
int getServiceId(int applicationId, String serviceName);
int getMaxServiceId();
int getMinServiceId();
void save(ServiceNameDataDefine.ServiceName serviceName);
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceNameEsDAO.class);
@Override public int getServiceId(int applicationId, String serviceName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ServiceNameTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int serviceId = (int)searchHit.getSource().get(ServiceNameTable.COLUMN_SERVICE_ID);
return serviceId;
}
return 0;
}
@Override public int getMaxServiceId() {
return getMaxId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
}
@Override public int getMinServiceId() {
return getMinId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
}
@Override public void save(ServiceNameDataDefine.ServiceName serviceName) {
logger.debug("save service name register info, application id: {}, service name: {}", serviceName.getApplicationId(), serviceName.getServiceName());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap();
source.put(ServiceNameTable.COLUMN_SERVICE_ID, serviceName.getServiceId());
source.put(ServiceNameTable.COLUMN_APPLICATION_ID, serviceName.getApplicationId());
source.put(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName.getServiceName());
IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, serviceName.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save service name register info, application id: {}, service name: {}, status: {}", serviceName.getApplicationId(), serviceName.getServiceName(), response.status().name());
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class ServiceNameH2DAO extends H2DAO implements IServiceNameDAO {
@Override public int getServiceId(int applicationId, String serviceName) {
return 0;
}
@Override public int getMaxServiceId() {
return 0;
}
@Override public int getMinServiceId() {
return 0;
}
@Override public void save(ServiceNameDataDefine.ServiceName serviceName) {
}
}
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationH2DAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2DAO
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterSerialWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker$Factory
\ No newline at end of file
......@@ -7,4 +7,7 @@ org.skywalking.apm.collector.agentstream.worker.register.application.Application
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameH2TableDefine
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册