提交 f4d19399 编写于 作者: P pengys5

#419 add configuration in application.yml file, e.g. elastic search index...

#419 add configuration in application.yml file, e.g. elastic search index shard number, index replicas number.
上级 ca684bf8
......@@ -17,14 +17,6 @@ public class CpuMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(CpuMetricTable.COLUMN_USAGE_PERCENT, ElasticSearchColumnDefine.Type.Double.name()));
......
......@@ -17,14 +17,6 @@ public class GCMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_PHRASE, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class MemoryMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryMetricTable.COLUMN_IS_HEAP, ElasticSearchColumnDefine.Type.Boolean.name()));
......
......@@ -17,14 +17,6 @@ public class MemoryPoolMetricEsTableDefine extends ElasticSearchTableDefine {
return 1;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(MemoryPoolMetricTable.COLUMN_POOL_TYPE, ElasticSearchColumnDefine.Type.Integer.name()));
......
package org.skywalking.apm.collector.agentregister.worker.application;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
/**
* @author pengys5
......@@ -17,14 +17,6 @@ public class ApplicationEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class InstanceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENT_UUID, ElasticSearchColumnDefine.Type.Keyword.name()));
......
package org.skywalking.apm.collector.agentregister.worker.servicename;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
/**
* @author pengys5
......@@ -17,14 +17,6 @@ public class ServiceNameEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
......
package org.skywalking.apm.collector.agentstream.worker.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ApplicationCache {
private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(1000).build();
public static int get(String applicationCode) {
int applicationId = 0;
try {
applicationId = CACHE.get(applicationCode, () -> {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationId(applicationCode);
});
} catch (Throwable e) {
return applicationId;
}
if (applicationId == 0) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
applicationId = dao.getApplicationId(applicationCode);
if (applicationId != 0) {
CACHE.put(applicationCode, applicationId);
}
}
return applicationId;
}
}
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class CacheSizeConfig {
public static class Cache {
public static class Analysis {
public static int SIZE = 1024;
}
public static class Persistence {
public static int SIZE = 5000;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.config;
/**
* @author pengys5
*/
public class WorkerConfig {
public static class WorkerNum {
public static class Node {
public static class NodeCompAgg {
public static int VALUE = 2;
}
public static class NodeMappingDayAgg {
public static int VALUE = 2;
}
public static class NodeMappingHourAgg {
public static int VALUE = 2;
}
public static class NodeMappingMinuteAgg {
public static int VALUE = 2;
}
}
public static class NodeRef {
public static class NodeRefDayAgg {
public static int VALUE = 2;
}
public static class NodeRefHourAgg {
public static int VALUE = 2;
}
public static class NodeRefMinuteAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumDayAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumHourAgg {
public static int VALUE = 2;
}
public static class NodeRefResSumMinuteAgg {
public static int VALUE = 2;
}
}
public static class GlobalTrace {
public static class GlobalTraceAgg {
public static int VALUE = 2;
}
}
}
public static class Queue {
public static class GlobalTrace {
public static class GlobalTraceAnalysis {
public static int SIZE = 1024;
}
}
public static class Segment {
public static class SegmentAnalysis {
public static int SIZE = 1024;
}
public static class SegmentCostAnalysis {
public static int SIZE = 4096;
}
public static class SegmentExceptionAnalysis {
public static int SIZE = 4096;
}
}
public static class Node {
public static class NodeCompAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeMappingMinuteAnalysis {
public static int SIZE = 1024;
}
}
public static class NodeRef {
public static class NodeRefDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefMinuteAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumDayAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumHourAnalysis {
public static int SIZE = 1024;
}
public static class NodeRefResSumMinuteAnalysis {
public static int SIZE = 1024;
}
}
}
}
......@@ -17,14 +17,6 @@ public class GlobalTraceEsTableDefine extends ElasticSearchTableDefine {
return 5;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
......
......@@ -17,14 +17,6 @@ public class InstPerformanceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class NodeComponentEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
......
......@@ -17,14 +17,6 @@ public class NodeMappingEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeMappingTable.COLUMN_ADDRESS_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class NodeReferenceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
package org.skywalking.apm.collector.agentstream.worker.register;
/**
* @author pengys5
*/
public enum IdAutoIncrement {
INSTANCE;
public int increment(int min, int max) {
int instanceId;
if (min == max) {
instanceId = -1;
} else if (min + max == 0) {
instanceId = max + 1;
} else if (min + max > 0) {
instanceId = min - 1;
} else if (max < 0) {
instanceId = 1;
} else {
instanceId = max + 1;
}
return instanceId;
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
/**
* @author pengys5
*/
public class ApplicationEsTableDefine extends ElasticSearchTableDefine {
public ApplicationEsTableDefine() {
super(ApplicationTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
/**
* @author pengys5
*/
public class ApplicationH2TableDefine extends H2TableDefine {
public ApplicationH2TableDefine() {
super(ApplicationTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterRemoteWorker.class);
protected ApplicationRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
ApplicationDataDefine.Application application = (ApplicationDataDefine.Application)message;
logger.debug("application code: {}", application.getApplicationCode());
getClusterContext().lookup(ApplicationRegisterSerialWorker.WorkerRole.INSTANCE).tell(application);
}
public static class Factory extends AbstractRemoteWorkerProvider<ApplicationRegisterRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ApplicationRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRegisterRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ApplicationRegisterRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterSerialWorker.class);
public ApplicationRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onWork(Object message) throws WorkerException {
if (message instanceof ApplicationDataDefine.Application) {
ApplicationDataDefine.Application application = (ApplicationDataDefine.Application)message;
logger.debug("register application, application code: {}", application.getApplicationCode());
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int applicationId = dao.getApplicationId(application.getApplicationCode());
if (applicationId == 0) {
int min = dao.getMinApplicationId();
if (min == 0) {
ApplicationDataDefine.Application userApplication = new ApplicationDataDefine.Application(String.valueOf(Const.USER_ID), Const.USER_CODE, Const.USER_ID);
dao.save(userApplication);
application.setApplicationId(-1);
application.setId("-1");
} else {
int max = dao.getMaxApplicationId();
applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application.setApplicationId(applicationId);
application.setId(String.valueOf(applicationId));
}
dao.save(application);
}
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ApplicationRegisterSerialWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ApplicationRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRegisterSerialWorker(role(), clusterContext);
}
@Override public int queueSize() {
return 256;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ApplicationRegisterSerialWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
private final Logger logger = LoggerFactory.getLogger(ApplicationEsDAO.class);
@Override public int getApplicationId(String applicationCode) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ApplicationTable.COLUMN_APPLICATION_CODE, applicationCode));
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int applicationId = (int)searchHit.getSource().get(ApplicationTable.COLUMN_APPLICATION_ID);
return applicationId;
}
return 0;
}
@Override public int getMaxApplicationId() {
return getMaxId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
}
@Override public int getMinApplicationId() {
return getMinId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
}
@Override public void save(ApplicationDataDefine.Application application) {
logger.debug("save application register info, application id: {}, application code: {}", application.getApplicationId(), application.getApplicationCode());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap();
source.put(ApplicationTable.COLUMN_APPLICATION_CODE, application.getApplicationCode());
source.put(ApplicationTable.COLUMN_APPLICATION_ID, application.getApplicationId());
IndexResponse response = client.prepareIndex(ApplicationTable.TABLE, application.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save application register info, application id: {}, application code: {}, status: {}", application.getApplicationId(), application.getApplicationCode(), response.status().name());
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
@Override public int getApplicationId(String applicationCode) {
H2Client client = getClient();
return 100;
}
@Override public int getMaxApplicationId() {
return 0;
}
@Override public int getMinApplicationId() {
return 0;
}
@Override public void save(ApplicationDataDefine.Application application) {
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
/**
* @author pengys5
*/
public interface IApplicationDAO {
int getApplicationId(String applicationCode);
int getMaxApplicationId();
int getMinApplicationId();
void save(ApplicationDataDefine.Application application);
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
/**
* @author pengys5
*/
public class InstanceEsTableDefine extends ElasticSearchTableDefine {
public InstanceEsTableDefine() {
super(InstanceTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENT_UUID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_OS_INFO, ElasticSearchColumnDefine.Type.Keyword.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
/**
* @author pengys5
*/
public class InstanceH2TableDefine extends H2TableDefine {
public InstanceH2TableDefine() {
super(InstanceTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_AGENT_UUID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_OS_INFO, H2ColumnDefine.Type.Varchar.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker {
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterRemoteWorker.class);
protected InstanceRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
InstanceDataDefine.Instance instance = (InstanceDataDefine.Instance)message;
logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
getClusterContext().lookup(InstanceRegisterSerialWorker.WorkerRole.INSTANCE).tell(instance);
}
public static class Factory extends AbstractRemoteWorkerProvider<InstanceRegisterRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public InstanceRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new InstanceRegisterRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return InstanceRegisterRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new InstanceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterSerialWorker.class);
public InstanceRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onWork(Object message) throws WorkerException {
if (message instanceof InstanceDataDefine.Instance) {
InstanceDataDefine.Instance instance = (InstanceDataDefine.Instance)message;
logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int instanceId = dao.getInstanceId(instance.getApplicationId(), instance.getAgentUUID());
if (instanceId == 0) {
// int min = dao.getMinInstanceId();
// if (min == 0) {
// instance.setId("1");
// instance.setInstanceId(1);
// } else {
// int max = dao.getMaxInstanceId();
// instanceId = IdAutoIncrement.INSTANCE.increment(min, max);
// instance.setId(String.valueOf(instanceId));
// instance.setInstanceId(instanceId);
// }
int max = dao.getMaxInstanceId();
if (max == 0) {
instance.setId("1");
instance.setInstanceId(1);
} else {
instance.setId(String.valueOf(max + 1));
instance.setInstanceId(max + 1);
}
dao.save(instance);
}
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstanceRegisterSerialWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public InstanceRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) {
return new InstanceRegisterSerialWorker(role(), clusterContext);
}
@Override public int queueSize() {
return 256;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return InstanceRegisterSerialWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
/**
* @author pengys5
*/
public interface IInstanceDAO {
int getInstanceId(int applicationId, String agentUUID);
int getMaxInstanceId();
int getMinInstanceId();
void save(InstanceDataDefine.Instance instance);
void updateHeartbeatTime(int instanceId, long heartbeatTime);
int getApplicationId(int applicationInstanceId);
}
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceEsDAO.class);
@Override public int getInstanceId(int applicationId, String agentUUID) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(InstanceTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENT_UUID, agentUUID));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
return (int)searchHit.getSource().get(InstanceTable.COLUMN_INSTANCE_ID);
}
return 0;
}
@Override public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
@Override public int getMinInstanceId() {
return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
@Override public void save(InstanceDataDefine.Instance instance) {
logger.debug("save instance register info, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_INSTANCE_ID, instance.getInstanceId());
source.put(InstanceTable.COLUMN_APPLICATION_ID, instance.getApplicationId());
source.put(InstanceTable.COLUMN_AGENT_UUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_REGISTER_TIME, instance.getRegisterTime());
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, instance.getHeartBeatTime());
source.put(InstanceTable.COLUMN_OS_INFO, instance.getOsInfo());
IndexResponse response = client.prepareIndex(InstanceTable.TABLE, instance.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save instance register info, application id: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name());
}
@Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
ElasticSearchClient client = getClient();
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(InstanceTable.TABLE);
updateRequest.type("type");
updateRequest.id(String.valueOf(instanceId));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, heartbeatTime);
updateRequest.doc(source);
client.update(updateRequest);
}
@Override public int getApplicationId(int applicationInstanceId) {
GetResponse response = getClient().prepareGet(InstanceTable.TABLE, String.valueOf(applicationInstanceId)).get();
if (response.isExists()) {
return (int)response.getSource().get(InstanceTable.COLUMN_APPLICATION_ID);
} else {
return 0;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class InstanceH2DAO extends H2DAO implements IInstanceDAO {
@Override public int getInstanceId(int applicationId, String agentUUID) {
return 0;
}
@Override public int getMaxInstanceId() {
return 0;
}
@Override public int getMinInstanceId() {
return 0;
}
@Override public void save(InstanceDataDefine.Instance instance) {
}
@Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
}
@Override public int getApplicationId(int applicationInstanceId) {
return 0;
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
/**
* @author pengys5
*/
public class ServiceNameEsTableDefine extends ElasticSearchTableDefine {
public ServiceNameEsTableDefine() {
super(ServiceNameTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
/**
* @author pengys5
*/
public class ServiceNameH2TableDefine extends H2TableDefine {
public ServiceNameH2TableDefine() {
super(ServiceNameTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.COLUMN_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker {
private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterRemoteWorker.class);
protected ServiceNameRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
ServiceNameDataDefine.ServiceName serviceName = (ServiceNameDataDefine.ServiceName)message;
logger.debug("service name: {}", serviceName.getServiceName());
getClusterContext().lookup(ServiceNameRegisterSerialWorker.WorkerRole.INSTANCE).tell(serviceName);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceNameRegisterRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceNameRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceNameRegisterRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceNameRegisterRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceNameDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterSerialWorker.class);
public ServiceNameRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onWork(Object message) throws WorkerException {
if (message instanceof ServiceNameDataDefine.ServiceName) {
ServiceNameDataDefine.ServiceName serviceName = (ServiceNameDataDefine.ServiceName)message;
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
int serviceId = dao.getServiceId(serviceName.getApplicationId(), serviceName.getServiceName());
if (serviceId == 0) {
int min = dao.getMinServiceId();
if (min == 0) {
ServiceNameDataDefine.ServiceName noneServiceName = new ServiceNameDataDefine.ServiceName("1", Const.NONE_SERVICE_NAME, 0, Const.NONE_SERVICE_ID);
dao.save(noneServiceName);
serviceName.setServiceId(-1);
serviceName.setId("-1");
} else {
int max = dao.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
serviceName.setId(String.valueOf(serviceId));
serviceName.setServiceId(serviceId);
}
dao.save(serviceName);
}
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceNameRegisterSerialWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ServiceNameRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ServiceNameRegisterSerialWorker(role(), clusterContext);
}
@Override public int queueSize() {
return 256;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ServiceNameRegisterSerialWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ServiceNameDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
/**
* @author pengys5
*/
public interface IServiceNameDAO {
int getServiceId(int applicationId, String serviceName);
String getServiceName(int serviceId);
int getMaxServiceId();
int getMinServiceId();
void save(ServiceNameDataDefine.ServiceName serviceName);
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceNameEsDAO.class);
@Override public int getServiceId(int applicationId, String serviceName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ServiceNameTable.TABLE);
searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int serviceId = (int)searchHit.getSource().get(ServiceNameTable.COLUMN_SERVICE_ID);
return serviceId;
}
return 0;
}
@Override public int getMaxServiceId() {
return getMaxId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
}
@Override public int getMinServiceId() {
return getMinId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
}
@Override public String getServiceName(int serviceId) {
GetResponse response = getClient().prepareGet(ServiceNameTable.TABLE, String.valueOf(serviceId)).get();
if (response.isExists()) {
return (String)response.getSource().get(ServiceNameTable.COLUMN_SERVICE_NAME);
} else {
return Const.EMPTY_STRING;
}
}
@Override public void save(ServiceNameDataDefine.ServiceName serviceName) {
logger.debug("save service name register info, application id: {}, service name: {}", serviceName.getApplicationId(), serviceName.getServiceName());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap();
source.put(ServiceNameTable.COLUMN_SERVICE_ID, serviceName.getServiceId());
source.put(ServiceNameTable.COLUMN_APPLICATION_ID, serviceName.getApplicationId());
source.put(ServiceNameTable.COLUMN_SERVICE_NAME, serviceName.getServiceName());
IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, serviceName.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save service name register info, application id: {}, service name: {}, status: {}", serviceName.getApplicationId(), serviceName.getServiceName(), response.status().name());
}
}
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class ServiceNameH2DAO extends H2DAO implements IServiceNameDAO {
@Override public int getServiceId(int applicationId, String serviceName) {
return 0;
}
@Override public int getMaxServiceId() {
return 0;
}
@Override public int getMinServiceId() {
return 0;
}
@Override public String getServiceName(int serviceId) {
return null;
}
@Override public void save(ServiceNameDataDefine.ServiceName serviceName) {
}
}
......@@ -17,14 +17,6 @@ public class SegmentCostEsTableDefine extends ElasticSearchTableDefine {
return 5;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SEGMENT_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(SegmentCostTable.COLUMN_SERVICE_NAME, ElasticSearchColumnDefine.Type.Text.name()));
......
......@@ -17,14 +17,6 @@ public class SegmentEsTableDefine extends ElasticSearchTableDefine {
return 10;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentTable.COLUMN_DATA_BINARY, ElasticSearchColumnDefine.Type.Binary.name()));
}
......
......@@ -17,14 +17,6 @@ public class ServiceEntryEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -17,14 +17,6 @@ public class ServiceReferenceEsTableDefine extends ElasticSearchTableDefine {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
......@@ -7,14 +7,6 @@ agent_server:
host: localhost
port: 10800
context_path: /
agent_register:
grpc:
host: localhost
port: 11800
jetty:
host: localhost
port: 12800
context_path: /
agent_stream:
grpc:
host: localhost
......@@ -23,10 +15,6 @@ agent_stream:
host: localhost
port: 12800
context_path: /
agent_jvm:
grpc:
host: localhost
port: 11800
ui:
jetty:
host: localhost
......@@ -41,3 +29,5 @@ storage:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
......@@ -7,4 +7,6 @@ public class StorageElasticSearchConfig {
public static String CLUSTER_NAME;
public static Boolean CLUSTER_TRANSPORT_SNIFFER;
public static String CLUSTER_NODES;
public static Integer INDEX_SHARDS_NUMBER;
public static Integer INDEX_REPLICAS_NUMBER;
}
......@@ -14,6 +14,8 @@ public class StorageElasticSearchConfigParser implements ModuleConfigParser {
private static final String CLUSTER_NAME = "cluster_name";
private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
private static final String CLUSTER_NODES = "cluster_nodes";
private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NAME))) {
......@@ -25,5 +27,15 @@ public class StorageElasticSearchConfigParser implements ModuleConfigParser {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NODES))) {
StorageElasticSearchConfig.CLUSTER_NODES = (String)config.get(CLUSTER_NODES);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(INDEX_SHARDS_NUMBER))) {
StorageElasticSearchConfig.INDEX_SHARDS_NUMBER = (Integer)config.get(INDEX_SHARDS_NUMBER);
} else {
StorageElasticSearchConfig.INDEX_SHARDS_NUMBER = 2;
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(INDEX_REPLICAS_NUMBER))) {
StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER = (Integer)config.get(INDEX_REPLICAS_NUMBER);
} else {
StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER = 0;
}
}
}
......@@ -11,6 +11,7 @@ import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.storage.ColumnDefine;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.core.storage.TableDefine;
import org.skywalking.apm.collector.storage.elasticsearch.StorageElasticSearchConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,8 +52,8 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) {
return Settings.builder()
.put("index.number_of_shards", tableDefine.numberOfShards())
.put("index.number_of_replicas", tableDefine.numberOfReplicas())
.put("index.number_of_shards", StorageElasticSearchConfig.INDEX_SHARDS_NUMBER)
.put("index.number_of_replicas", StorageElasticSearchConfig.INDEX_REPLICAS_NUMBER)
.put("index.refresh_interval", String.valueOf(tableDefine.refreshInterval()) + "s")
.put("analysis.analyzer.collector_analyzer.tokenizer", "collector_tokenizer")
......
......@@ -16,8 +16,4 @@ public abstract class ElasticSearchTableDefine extends TableDefine {
}
public abstract int refreshInterval();
public abstract int numberOfShards();
public abstract int numberOfReplicas();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册