提交 58aaaf2d 编写于 作者: P peng-yongsheng

Add instance mapping analysis to collect the relationship between instance and it’s ip:port.

上级 7a415470
......@@ -35,4 +35,5 @@ public class MetricGraphIdDefine {
public static final int SERVICE_ENTRY_GRAPH_ID = 408;
public static final int GLOBAL_TRACE_GRAPH_ID = 409;
public static final int SEGMENT_COST_GRAPH_ID = 410;
public static final int INSTANCE_MAPPING_GRAPH_ID = 411;
}
......@@ -65,4 +65,8 @@ public class MetricWorkerIdDefine {
public static final int APPLICATION_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 430;
public static final int SERVICE_METRIC_GRAPH_BRIDGE_WORKER_ID = 431;
public static final int INSTANCE_METRIC_GRAPH_BRIDGE_WORKER_ID = 432;
public static final int INSTANCE_MAPPING_AGGREGATION_WORKER_ID = 433;
public static final int INSTANCE_MAPPING_REMOTE_WORKER_ID = 434;
public static final int INSTANCE_MAPPING_PERSISTENCE_WORKER_ID = 435;
}
......@@ -28,6 +28,8 @@ import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.appli
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.ApplicationReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMappingGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMappingSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.InstanceReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentCostGraph;
......@@ -87,6 +89,7 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
segmentParserListenerRegister.register(new ServiceReferenceMetricSpanListener.Factory());
segmentParserListenerRegister.register(new ApplicationComponentSpanListener.Factory());
segmentParserListenerRegister.register(new ApplicationMappingSpanListener.Factory());
segmentParserListenerRegister.register(new InstanceMappingSpanListener.Factory());
segmentParserListenerRegister.register(new ServiceEntrySpanListener.Factory());
segmentParserListenerRegister.register(new GlobalTraceSpanListener.Factory());
segmentParserListenerRegister.register(new SegmentCostSpanListener.Factory());
......@@ -117,6 +120,9 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
ApplicationMappingGraph applicationMappingGraph = new ApplicationMappingGraph(getManager(), workerCreateListener);
applicationMappingGraph.create();
InstanceMappingGraph instanceMappingGraph = new InstanceMappingGraph(getManager(), workerCreateListener);
instanceMappingGraph.create();
ServiceEntryGraph serviceEntryGraph = new ServiceEntryGraph(getManager(), workerCreateListener);
serviceEntryGraph.create();
......
......@@ -46,14 +46,16 @@ public class ApplicationMappingSpanListener implements FirstSpanListener, EntryS
private long timeBucket;
@Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
logger.debug("node mapping listener parse reference");
logger.debug("application mapping listener parse reference");
if (spanDecorator.getRefsCount() > 0) {
ApplicationMapping applicationMapping = new ApplicationMapping(Const.EMPTY_STRING);
applicationMapping.setApplicationId(applicationId);
applicationMapping.setAddressId(spanDecorator.getRefs(0).getNetworkAddressId());
String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(applicationMapping.getAddressId());
applicationMapping.setId(id);
applicationMappings.add(applicationMapping);
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ApplicationMapping applicationMapping = new ApplicationMapping(Const.EMPTY_STRING);
applicationMapping.setApplicationId(applicationId);
applicationMapping.setAddressId(spanDecorator.getRefs(i).getNetworkAddressId());
String id = String.valueOf(applicationId) + Const.ID_SPLIT + String.valueOf(applicationMapping.getAddressId());
applicationMapping.setId(id);
applicationMappings.add(applicationMapping);
}
}
}
......@@ -64,12 +66,12 @@ public class ApplicationMappingSpanListener implements FirstSpanListener, EntryS
}
@Override public void build() {
logger.debug("node mapping listener build");
logger.debug("application mapping listener build");
Graph<ApplicationMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.APPLICATION_MAPPING_GRAPH_ID, ApplicationMapping.class);
applicationMappings.forEach(applicationMapping -> {
applicationMapping.setId(timeBucket + Const.ID_SPLIT + applicationMapping.getId());
applicationMapping.setTimeBucket(timeBucket);
logger.debug("push to node mapping aggregation worker, id: {}", applicationMapping.getId());
logger.debug("push to application mapping aggregation worker, id: {}", applicationMapping.getId());
graph.start(applicationMapping);
});
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.AggregationWorker;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
/**
* @author peng-yongsheng
*/
public class InstanceMappingAggregationWorker extends AggregationWorker<InstanceMapping, InstanceMapping> {
InstanceMappingAggregationWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.INSTANCE_MAPPING_AGGREGATION_WORKER_ID;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstanceMapping, InstanceMapping, InstanceMappingAggregationWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public InstanceMappingAggregationWorker workerInstance(ModuleManager moduleManager) {
return new InstanceMappingAggregationWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
/**
* @author peng-yongsheng
*/
public class InstanceMappingGraph {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
public InstanceMappingGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}
public void create() {
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.INSTANCE_MAPPING_GRAPH_ID, InstanceMapping.class)
.addNode(new InstanceMappingAggregationWorker.Factory(moduleManager).create(workerCreateListener))
.addNext(new InstanceMappingRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.INSTANCE_MAPPING_GRAPH_ID).create(workerCreateListener))
.addNext(new InstanceMappingAggregationWorker.Factory(moduleManager).create(workerCreateListener));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IApplicationMappingPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
/**
* @author peng-yongsheng
*/
public class InstanceMappingPersistenceWorker extends PersistenceWorker<InstanceMapping> {
InstanceMappingPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.INSTANCE_MAPPING_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
return true;
}
@SuppressWarnings("unchecked")
@Override protected IPersistenceDAO<?, ?, InstanceMapping> persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IApplicationMappingPersistenceDAO.class);
}
public static class Factory extends PersistenceWorkerProvider<InstanceMapping, InstanceMappingPersistenceWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public InstanceMappingPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new InstanceMappingPersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.apache.skywalking.apm.collector.remote.service.Selector;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
/**
* @author peng-yongsheng
*/
public class InstanceMappingRemoteWorker extends AbstractRemoteWorker<InstanceMapping, InstanceMapping> {
InstanceMappingRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.INSTANCE_MAPPING_REMOTE_WORKER_ID;
}
@Override protected void onWork(InstanceMapping instanceMapping) throws WorkerException {
onNext(instanceMapping);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<InstanceMapping, InstanceMapping, InstanceMappingRemoteWorker> {
public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) {
super(moduleManager, remoteSenderService, graphId);
}
@Override public InstanceMappingRemoteWorker workerInstance(ModuleManager moduleManager) {
return new InstanceMappingRemoteWorker(moduleManager);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.EntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.FirstSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.SpanListenerFactory;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceMappingSpanListener implements FirstSpanListener, EntrySpanListener {
private final Logger logger = LoggerFactory.getLogger(InstanceMappingSpanListener.class);
private List<InstanceMapping> instanceMappings = new LinkedList<>();
private long timeBucket;
@Override public void parseEntry(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
logger.debug("instance mapping listener parse reference");
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
InstanceMapping instanceMapping = new InstanceMapping(Const.EMPTY_STRING);
instanceMapping.setApplicationId(applicationId);
instanceMapping.setInstanceId(instanceId);
instanceMapping.setAddressId(spanDecorator.getRefs(i).getNetworkAddressId());
String id = String.valueOf(instanceId) + Const.ID_SPLIT + String.valueOf(instanceMapping.getAddressId());
instanceMapping.setId(id);
instanceMappings.add(instanceMapping);
}
}
}
@Override
public void parseFirst(SpanDecorator spanDecorator, int applicationId, int instanceId,
String segmentId) {
timeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(spanDecorator.getStartTime());
}
@Override public void build() {
logger.debug("instance mapping listener build");
Graph<InstanceMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_MAPPING_GRAPH_ID, InstanceMapping.class);
instanceMappings.forEach(instanceMapping -> {
instanceMapping.setId(timeBucket + Const.ID_SPLIT + instanceMapping.getId());
instanceMapping.setTimeBucket(timeBucket);
logger.debug("push to instance mapping aggregation worker, id: {}", instanceMapping.getId());
graph.start(instanceMapping);
});
}
public static class Factory implements SpanListenerFactory {
@Override public SpanListener create(ModuleManager moduleManager) {
return new InstanceMappingSpanListener();
}
}
}
......@@ -45,6 +45,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmListPersist
import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO;
......@@ -127,6 +128,7 @@ public class StorageModule extends Module {
classes.add(IInstanceMetricPersistenceDAO.class);
classes.add(IInstanceReferenceMetricPersistenceDAO.class);
classes.add(IInstanceMappingPersistenceDAO.class);
classes.add(IInstanceHeartBeatPersistenceDAO.class);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.dao;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
/**
* @author peng-yongsheng
*/
public interface IInstanceMappingPersistenceDAO<Insert, Update, DataImpl extends InstanceMapping> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.table.instance;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.Data;
import org.apache.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class InstanceMapping extends Data {
private static final Column[] STRING_COLUMNS = {
new Column(InstanceMappingTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(InstanceMappingTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(InstanceMappingTable.COLUMN_APPLICATION_ID, new CoverOperation()),
new Column(InstanceMappingTable.COLUMN_INSTANCE_ID, new CoverOperation()),
new Column(InstanceMappingTable.COLUMN_ADDRESS_ID, new CoverOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public InstanceMapping(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public int getApplicationId() {
return getDataInteger(0);
}
public void setApplicationId(int applicationId) {
setDataInteger(0, applicationId);
}
public int getInstanceId() {
return getDataInteger(1);
}
public void setInstanceId(int instanceId) {
setDataInteger(1, instanceId);
}
public int getAddressId() {
return getDataInteger(2);
}
public void setAddressId(int addressId) {
setDataInteger(2, addressId);
}
public long getTimeBucket() {
return getDataLong(0);
}
public void setTimeBucket(long timeBucket) {
setDataLong(0, timeBucket);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.table.instance;
import org.apache.skywalking.apm.collector.core.data.CommonTable;
/**
* @author peng-yongsheng
*/
public class InstanceMappingTable extends CommonTable {
public static final String TABLE = "instance_mapping";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_INSTANCE_ID = "instance_id";
public static final String COLUMN_ADDRESS_ID = "address_id";
}
......@@ -54,6 +54,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmListPersist
import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO;
......@@ -107,6 +108,7 @@ import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsCacheDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsRegisterDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMappingEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceMetricEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.InstanceReferenceAlarmEsPersistenceDAO;
......@@ -237,6 +239,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceMappingPersistenceDAO.class, new InstanceMappingEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMappingTable;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceMappingEsPersistenceDAO extends EsDAO implements IInstanceMappingPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstanceMapping> {
private final Logger logger = LoggerFactory.getLogger(InstanceMappingEsPersistenceDAO.class);
public InstanceMappingEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public InstanceMapping get(String id) {
GetResponse getResponse = getClient().prepareGet(InstanceMappingTable.TABLE, id).get();
if (getResponse.isExists()) {
InstanceMapping instanceMapping = new InstanceMapping(id);
Map<String, Object> source = getResponse.getSource();
instanceMapping.setApplicationId(((Number)source.get(InstanceMappingTable.COLUMN_APPLICATION_ID)).intValue());
instanceMapping.setInstanceId(((Number)source.get(InstanceMappingTable.COLUMN_INSTANCE_ID)).intValue());
instanceMapping.setAddressId(((Number)source.get(InstanceMappingTable.COLUMN_ADDRESS_ID)).intValue());
instanceMapping.setTimeBucket(((Number)source.get(InstanceMappingTable.COLUMN_TIME_BUCKET)).longValue());
return instanceMapping;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(InstanceMapping data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceMappingTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceMappingTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(InstanceMappingTable.COLUMN_ADDRESS_ID, data.getAddressId());
source.put(InstanceMappingTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(InstanceMappingTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(InstanceMapping data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceMappingTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(InstanceMappingTable.COLUMN_INSTANCE_ID, data.getInstanceId());
source.put(InstanceMappingTable.COLUMN_ADDRESS_ID, data.getAddressId());
source.put(InstanceMappingTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(InstanceMappingTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(InstanceMappingTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(InstanceMappingTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, InstanceMappingTable.TABLE);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMappingTable;
/**
* @author peng-yongsheng
*/
public class InstanceMappingEsTableDefine extends ElasticSearchTableDefine {
public InstanceMappingEsTableDefine() {
super(InstanceMappingTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceMappingTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMappingTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMappingTable.COLUMN_ADDRESS_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMappingTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
......@@ -20,6 +20,7 @@ org.apache.skywalking.apm.collector.storage.es.define.MemoryPoolMetricEsTableDef
org.apache.skywalking.apm.collector.storage.es.define.GlobalTraceEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.ApplicationComponentEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.ApplicationMappingEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.InstanceMappingEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.SegmentCostEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.SegmentEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.ServiceEntryEsTableDefine
......
......@@ -50,6 +50,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmListPersist
import org.apache.skywalking.apm.collector.storage.dao.IInstanceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceReferenceAlarmListPersistenceDAO;
......@@ -103,6 +104,7 @@ import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceH2CacheDAO;
import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceH2RegisterDAO;
import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceH2UIDAO;
import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2PersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceMappingH2PersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2PersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2UIDAO;
import org.apache.skywalking.apm.collector.storage.h2.dao.InstanceReferenceAlarmH2PersistenceDAO;
......@@ -213,9 +215,10 @@ public class StorageModuleH2Provider extends ModuleProvider {
this.registerServiceImplementation(IServiceMetricPersistenceDAO.class, new ServiceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceMappingPersistenceDAO.class, new InstanceMappingH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatH2PersistenceDAO(h2Client));
}
private void registerUiDAO() throws ServiceNotProvidedException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.h2.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceMappingPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMappingTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class InstanceMappingH2PersistenceDAO extends H2DAO implements IInstanceMappingPersistenceDAO<H2SqlEntity, H2SqlEntity, InstanceMapping> {
private final Logger logger = LoggerFactory.getLogger(InstanceMappingH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
public InstanceMappingH2PersistenceDAO(H2Client client) {
super(client);
}
@Override public InstanceMapping get(String id) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SQL, InstanceMappingTable.TABLE, InstanceMappingTable.COLUMN_ID);
Object[] params = new Object[] {id};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
InstanceMapping instanceMapping = new InstanceMapping(id);
instanceMapping.setApplicationId(rs.getInt(InstanceMappingTable.COLUMN_APPLICATION_ID));
instanceMapping.setInstanceId(rs.getInt(InstanceMappingTable.COLUMN_INSTANCE_ID));
instanceMapping.setAddressId(rs.getInt(InstanceMappingTable.COLUMN_ADDRESS_ID));
instanceMapping.setTimeBucket(rs.getLong(InstanceMappingTable.COLUMN_TIME_BUCKET));
return instanceMapping;
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return null;
}
@Override public H2SqlEntity prepareBatchInsert(InstanceMapping instanceMapping) {
Map<String, Object> source = new HashMap<>();
H2SqlEntity entity = new H2SqlEntity();
source.put(InstanceMappingTable.COLUMN_ID, instanceMapping.getId());
source.put(InstanceMappingTable.COLUMN_APPLICATION_ID, instanceMapping.getApplicationId());
source.put(InstanceMappingTable.COLUMN_INSTANCE_ID, instanceMapping.getInstanceId());
source.put(InstanceMappingTable.COLUMN_ADDRESS_ID, instanceMapping.getAddressId());
source.put(InstanceMappingTable.COLUMN_TIME_BUCKET, instanceMapping.getTimeBucket());
String sql = SqlBuilder.buildBatchInsertSql(InstanceMappingTable.TABLE, source.keySet());
entity.setSql(sql);
entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override public H2SqlEntity prepareBatchUpdate(InstanceMapping instanceMapping) {
Map<String, Object> source = new HashMap<>();
H2SqlEntity entity = new H2SqlEntity();
source.put(InstanceMappingTable.COLUMN_APPLICATION_ID, instanceMapping.getApplicationId());
source.put(InstanceMappingTable.COLUMN_INSTANCE_ID, instanceMapping.getInstanceId());
source.put(InstanceMappingTable.COLUMN_ADDRESS_ID, instanceMapping.getAddressId());
source.put(InstanceMappingTable.COLUMN_TIME_BUCKET, instanceMapping.getTimeBucket());
String sql = SqlBuilder.buildBatchUpdateSql(InstanceMappingTable.TABLE, source.keySet(), InstanceMappingTable.COLUMN_ID);
entity.setSql(sql);
List<Object> values = new ArrayList<>(source.values());
values.add(instanceMapping.getId());
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.h2.define;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2ColumnDefine;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2TableDefine;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMappingTable;
/**
* @author peng-yongsheng
*/
public class InstanceMappingH2TableDefine extends H2TableDefine {
public InstanceMappingH2TableDefine() {
super(InstanceMappingTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(InstanceMappingTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstanceMappingTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceMappingTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceMappingTable.COLUMN_ADDRESS_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceMappingTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
......@@ -10,6 +10,7 @@ org.apache.skywalking.apm.collector.storage.h2.define.InstanceMetricH2TableDefin
org.apache.skywalking.apm.collector.storage.h2.define.InstanceReferenceMetricH2TableDefine
org.apache.skywalking.apm.collector.storage.h2.define.ApplicationComponentH2TableDefine
org.apache.skywalking.apm.collector.storage.h2.define.ApplicationMappingH2TableDefine
org.apache.skywalking.apm.collector.storage.h2.define.InstanceMappingH2TableDefine
org.apache.skywalking.apm.collector.storage.h2.define.ApplicationReferenceMetricH2TableDefine
org.apache.skywalking.apm.collector.storage.h2.define.SegmentCostH2TableDefine
org.apache.skywalking.apm.collector.storage.h2.define.SegmentH2TableDefine
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册