提交 863933af 编写于 作者: P peng-yongsheng

trace workers

上级 5bd379fd
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.global;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class GlobalTracePersistenceWorker extends PersistenceWorker<GlobalTrace, GlobalTrace> {
public GlobalTracePersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IGlobalTracePersistenceDAO)getDaoService().get(IGlobalTracePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<GlobalTrace, GlobalTrace, GlobalTracePersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<GlobalTrace> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public GlobalTracePersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new GlobalTracePersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.instance;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class InstPerformancePersistenceWorker extends PersistenceWorker<InstPerformance, InstPerformance> {
public InstPerformancePersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(IInstPerformancePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstPerformance, InstPerformance, InstPerformancePersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<InstPerformance> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public InstPerformancePersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new InstPerformancePersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/**
* @author peng-yongsheng
*/
public class NodeComponentAggregationWorker extends AggregationWorker<NodeComponent, NodeComponent> {
public NodeComponentAggregationWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponent, NodeComponent, NodeComponentAggregationWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<NodeComponent> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public NodeComponentAggregationWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeComponentAggregationWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class NodeComponentPersistenceWorker extends PersistenceWorker<NodeComponent, NodeComponent> {
public NodeComponentPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(INodeComponentPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponent, NodeComponent, NodeComponentPersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<NodeComponent> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public NodeComponentPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeComponentPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class NodeComponentRemoteWorker extends AbstractRemoteWorker<NodeComponent, NodeComponent> {
public NodeComponentRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(NodeComponent nodeComponent) throws WorkerException {
onNext(nodeComponent);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeComponent, NodeComponent, NodeComponentRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override
public NodeComponentRemoteWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeComponentRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/**
* @author peng-yongsheng
*/
public class NodeMappingAggregationWorker extends AggregationWorker<NodeMapping, NodeMapping> {
public NodeMappingAggregationWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMapping, NodeMapping, NodeMappingAggregationWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<NodeMapping> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public NodeMappingAggregationWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeMappingAggregationWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class NodeMappingPersistenceWorker extends PersistenceWorker<NodeMapping, NodeMapping> {
public NodeMappingPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(INodeMappingPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeMapping, NodeMapping, NodeMappingPersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<NodeMapping> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public NodeMappingPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeMappingPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.node;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class NodeMappingRemoteWorker extends AbstractRemoteWorker<NodeMapping, NodeMapping> {
public NodeMappingRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(NodeMapping nodeMapping) throws WorkerException {
onNext(nodeMapping);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeMapping, NodeMapping, NodeMappingRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override
public NodeMappingRemoteWorker workerInstance(DAOService daoService, CacheServiceManager cacheServiceManager) {
return new NodeMappingRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.noderef;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/**
* @author peng-yongsheng
*/
public class NodeReferenceAggregationWorker extends AggregationWorker<NodeReference, NodeReference> {
public NodeReferenceAggregationWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeReference, NodeReference, NodeReferenceAggregationWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<NodeReference> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public NodeReferenceAggregationWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeReferenceAggregationWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.noderef;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class NodeReferencePersistenceWorker extends PersistenceWorker<NodeReference, NodeReference> {
public NodeReferencePersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(INodeReferencePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeReference, NodeReference, NodeReferencePersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<NodeReference> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public NodeReferencePersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeReferencePersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.noderef;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class NodeReferenceRemoteWorker extends AbstractRemoteWorker<NodeReference, NodeReference> {
public NodeReferenceRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(NodeReference nodeReference) throws WorkerException {
onNext(nodeReference);
}
public static class Factory extends AbstractRemoteWorkerProvider<NodeReference, NodeReference, NodeReferenceRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override
public NodeReferenceRemoteWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new NodeReferenceRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.segment;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class SegmentCostPersistenceWorker extends PersistenceWorker<SegmentCost, SegmentCost> {
public SegmentCostPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(ISegmentCostPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentCost, SegmentCost, SegmentCostPersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<SegmentCost> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public SegmentCostPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new SegmentCostPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.segment;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class SegmentPersistenceWorker extends PersistenceWorker<Segment, Segment> {
public SegmentPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return false;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(ISegmentPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<Segment, Segment, SegmentPersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<Segment> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public SegmentPersistenceWorker workerInstance(DAOService daoService, CacheServiceManager cacheServiceManager) {
return new SegmentPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/**
* @author peng-yongsheng
*/
public class ServiceEntryAggregationWorker extends AggregationWorker<ServiceEntry, ServiceEntry> {
public ServiceEntryAggregationWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceEntry, ServiceEntry, ServiceEntryAggregationWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<ServiceEntry> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public ServiceEntryAggregationWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ServiceEntryAggregationWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class ServiceEntryPersistenceWorker extends PersistenceWorker<ServiceEntry, ServiceEntry> {
public ServiceEntryPersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(IServiceEntryPersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceEntry, ServiceEntry, ServiceEntryPersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<ServiceEntry> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public ServiceEntryPersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ServiceEntryPersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class ServiceEntryRemoteWorker extends AbstractRemoteWorker<ServiceEntry, ServiceEntry> {
public ServiceEntryRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(ServiceEntry serviceEntry) throws WorkerException {
onNext(serviceEntry);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceEntry, ServiceEntry, ServiceEntryRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override
public ServiceEntryRemoteWorker workerInstance(DAOService daoService, CacheServiceManager cacheServiceManager) {
return new ServiceEntryRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceAggregationWorker extends AggregationWorker<ServiceReference, ServiceReference> {
public ServiceReferenceAggregationWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReference, ServiceReference, ServiceReferenceAggregationWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<ServiceReference> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public ServiceReferenceAggregationWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ServiceReferenceAggregationWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class ServiceReferencePersistenceWorker extends PersistenceWorker<ServiceReference, ServiceReference> {
public ServiceReferencePersistenceWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)getDaoService().get(IServiceReferencePersistenceDAO.class);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReference, ServiceReference, ServiceReferencePersistenceWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<ServiceReference> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public ServiceReferencePersistenceWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ServiceReferencePersistenceWorker(getDaoService(), getCacheServiceManager());
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.serviceref;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceRemoteWorker extends AbstractRemoteWorker<ServiceReference, ServiceReference> {
public ServiceReferenceRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(ServiceReference serviceReference) throws WorkerException {
onNext(serviceReference);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceReference, ServiceReference, ServiceReferenceRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override
public ServiceReferenceRemoteWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ServiceReferenceRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
......@@ -18,8 +18,11 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface IInstPerformanceDAO {
public interface IGlobalTracePersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface IInstPerformancePersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface INodeComponentPersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
......@@ -18,8 +18,11 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface INodeComponentDAO {
public interface INodeMappingPersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface INodeReferencePersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
/**
* @author peng-yongsheng
*/
public interface ISegmentCostDAO {
}
......@@ -18,8 +18,11 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface INodeReferenceDAO {
public interface ISegmentCostPersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
/**
* @author peng-yongsheng
*/
public interface ISegmentDAO {
}
......@@ -18,8 +18,11 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface IGlobalTraceStreamDAO {
public interface ISegmentPersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
/**
* @author peng-yongsheng
*/
public interface IServiceEntryDAO {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface IServiceEntryPersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
/**
* @author peng-yongsheng
*/
public interface IServiceReferenceDAO {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface IServiceReferencePersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
......@@ -48,4 +48,36 @@ public class NodeMapping extends Data {
public NodeMapping(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public String getAddress() {
return getDataString(1);
}
public void setAddress(String address) {
setDataString(1, address);
}
public int getApplicationId() {
return getDataInteger(0);
}
public void setApplicationId(int applicationId) {
setDataInteger(0, applicationId);
}
public int getAddressId() {
return getDataInteger(1);
}
public void setAddressId(int addressId) {
setDataInteger(1, addressId);
}
public long getTimeBucket() {
return getDataLong(0);
}
public void setTimeBucket(long timeBucket) {
setDataLong(0, timeBucket);
}
}
......@@ -23,8 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceStreamDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.skywalking.apm.collector.storage.table.global.GlobalTraceTable;
......@@ -34,9 +33,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GlobalTraceEsDAO extends EsDAO implements IGlobalTraceStreamDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GlobalTrace> {
public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GlobalTrace> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsDAO.class);
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsPersistenceDAO.class);
@Override public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
......
......@@ -23,8 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.storage.table.instance.InstPerformanceTable;
......@@ -34,9 +33,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstPerformance> {
public class InstPerformanceEsPersistenceDAO extends EsDAO implements IInstPerformancePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, InstPerformance> {
private final Logger logger = LoggerFactory.getLogger(InstPerformanceEsDAO.class);
private final Logger logger = LoggerFactory.getLogger(InstPerformanceEsPersistenceDAO.class);
@Override public InstPerformance get(String id) {
GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get();
......
......@@ -23,8 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
......@@ -32,7 +31,7 @@ import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
/**
* @author peng-yongsheng
*/
public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeComponent> {
public class NodeComponentEsPersistenceDAO extends EsDAO implements INodeComponentPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeComponent> {
@Override public NodeComponent get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeComponentTable.TABLE, id).get();
......
......@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
*/
public class NodeComponentEsUIDAO extends EsDAO implements INodeComponentUIDAO {
private final Logger logger = LoggerFactory.getLogger(NodeComponentEsDAO.class);
private final Logger logger = LoggerFactory.getLogger(NodeComponentEsPersistenceDAO.class);
@Override public JsonArray load(long startTime, long endTime) {
logger.debug("node component load, start time: {}, end time: {}", startTime, endTime);
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
/**
* @author peng-yongsheng
*/
public class NodeMappingEsPersistenceDAO extends EsDAO implements INodeMappingPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeMapping> {
@Override public NodeMapping get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeMappingTable.TABLE, id).get();
if (getResponse.isExists()) {
NodeMapping nodeMapping = new NodeMapping(id);
Map<String, Object> source = getResponse.getSource();
nodeMapping.setApplicationId(((Number)source.get(NodeMappingTable.COLUMN_APPLICATION_ID)).intValue());
nodeMapping.setAddressId(((Number)source.get(NodeMappingTable.COLUMN_ADDRESS_ID)).intValue());
nodeMapping.setAddress((String)source.get(NodeMappingTable.COLUMN_ADDRESS));
nodeMapping.setTimeBucket(((Number)source.get(NodeMappingTable.COLUMN_TIME_BUCKET)).longValue());
return nodeMapping;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(NodeMapping data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeMappingTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(NodeMappingTable.COLUMN_ADDRESS_ID, data.getAddressId());
source.put(NodeMappingTable.COLUMN_ADDRESS, data.getAddress());
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(NodeMappingTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(NodeMapping data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeMappingTable.COLUMN_APPLICATION_ID, data.getApplicationId());
source.put(NodeMappingTable.COLUMN_ADDRESS_ID, data.getAddressId());
source.put(NodeMappingTable.COLUMN_ADDRESS, data.getAddress());
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(NodeMappingTable.TABLE, data.getId()).setDoc(source);
}
}
......@@ -23,8 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.noderef.NodeReferenceTable;
......@@ -32,7 +31,7 @@ import org.skywalking.apm.collector.storage.table.noderef.NodeReferenceTable;
/**
* @author peng-yongsheng
*/
public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeReference> {
public class NodeReferenceEsPersistenceDAO extends EsDAO implements INodeReferencePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, NodeReference> {
@Override public NodeReference get(String id) {
GetResponse getResponse = getClient().prepareGet(NodeReferenceTable.TABLE, id).get();
......
......@@ -22,8 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
import org.skywalking.apm.collector.storage.table.segment.SegmentCostTable;
......@@ -33,9 +32,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class SegmentCostEsDAO extends EsDAO implements ISegmentCostDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, SegmentCost> {
public class SegmentCostEsPersistenceDAO extends EsDAO implements ISegmentCostPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, SegmentCost> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostEsDAO.class);
private final Logger logger = LoggerFactory.getLogger(SegmentCostEsPersistenceDAO.class);
@Override public SegmentCost get(String id) {
return null;
......
......@@ -23,8 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.collector.storage.table.segment.SegmentTable;
......@@ -34,9 +33,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class SegmentEsDAO extends EsDAO implements ISegmentDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, Segment> {
public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, Segment> {
private final Logger logger = LoggerFactory.getLogger(SegmentEsDAO.class);
private final Logger logger = LoggerFactory.getLogger(SegmentEsPersistenceDAO.class);
@Override public Segment get(String id) {
return null;
......
......@@ -23,8 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.storage.table.service.ServiceEntryTable;
......@@ -32,7 +31,7 @@ import org.skywalking.apm.collector.storage.table.service.ServiceEntryTable;
/**
* @author peng-yongsheng
*/
public class ServiceEntryEsDAO extends EsDAO implements IServiceEntryDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceEntry> {
public class ServiceEntryEsPersistenceDAO extends EsDAO implements IServiceEntryPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceEntry> {
@Override public ServiceEntry get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceEntryTable.TABLE, id).get();
......
......@@ -23,8 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReferenceTable;
......@@ -34,9 +33,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReference> {
public class ServiceReferenceEsPersistenceDAO extends EsDAO implements IServiceReferencePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReference> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsDAO.class);
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsPersistenceDAO.class);
@Override public ServiceReference get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceTable.TABLE, id).get();
......
......@@ -21,9 +21,8 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.UnexpectedException;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceStreamDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
......@@ -34,8 +33,8 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceStreamDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, GlobalTrace> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceH2DAO.class);
public class GlobalTraceH2PersistenceDAO extends H2DAO implements IGlobalTracePersistenceDAO<H2SqlEntity, H2SqlEntity, GlobalTrace> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceH2PersistenceDAO.class);
@Override public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
......
......@@ -26,9 +26,8 @@ import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IInstPerformanceDAO;
import org.skywalking.apm.collector.storage.dao.IInstPerformancePersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
......@@ -39,9 +38,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, InstPerformance> {
public class InstPerformanceH2PersistenceDAO extends H2DAO implements IInstPerformancePersistenceDAO<H2SqlEntity, H2SqlEntity, InstPerformance> {
private final Logger logger = LoggerFactory.getLogger(InstPerformanceH2DAO.class);
private final Logger logger = LoggerFactory.getLogger(InstPerformanceH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
@Override public InstPerformance get(String id) {
......
......@@ -26,9 +26,8 @@ import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.INodeComponentDAO;
import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
......@@ -39,8 +38,8 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, NodeComponent> {
private final Logger logger = LoggerFactory.getLogger(NodeComponentH2DAO.class);
public class NodeComponentH2PersistenceDAO extends H2DAO implements INodeComponentPersistenceDAO<H2SqlEntity, H2SqlEntity, NodeComponent> {
private final Logger logger = LoggerFactory.getLogger(NodeComponentH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
@Override
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class NodeMappingH2PersistenceDAO extends H2DAO implements INodeMappingPersistenceDAO<H2SqlEntity, H2SqlEntity, NodeMapping> {
private final Logger logger = LoggerFactory.getLogger(NodeMappingH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
@Override public NodeMapping get(String id) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SQL, NodeMappingTable.TABLE, NodeMappingTable.COLUMN_ID);
Object[] params = new Object[] {id};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
NodeMapping nodeMapping = new NodeMapping(id);
nodeMapping.setApplicationId(rs.getInt(NodeMappingTable.COLUMN_APPLICATION_ID));
nodeMapping.setAddressId(rs.getInt(NodeMappingTable.COLUMN_ADDRESS_ID));
nodeMapping.setAddress(rs.getString(NodeMappingTable.COLUMN_ADDRESS));
nodeMapping.setTimeBucket(rs.getLong(NodeMappingTable.COLUMN_TIME_BUCKET));
return nodeMapping;
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return null;
}
@Override public H2SqlEntity prepareBatchInsert(NodeMapping nodeMapping) {
Map<String, Object> source = new HashMap<>();
H2SqlEntity entity = new H2SqlEntity();
source.put(NodeMappingTable.COLUMN_ID, nodeMapping.getId());
source.put(NodeMappingTable.COLUMN_APPLICATION_ID, nodeMapping.getApplicationId());
source.put(NodeMappingTable.COLUMN_ADDRESS_ID, nodeMapping.getAddressId());
source.put(NodeMappingTable.COLUMN_ADDRESS, nodeMapping.getAddress());
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, nodeMapping.getTimeBucket());
String sql = SqlBuilder.buildBatchInsertSql(NodeMappingTable.TABLE, source.keySet());
entity.setSql(sql);
entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override public H2SqlEntity prepareBatchUpdate(NodeMapping nodeMapping) {
Map<String, Object> source = new HashMap<>();
H2SqlEntity entity = new H2SqlEntity();
source.put(NodeMappingTable.COLUMN_APPLICATION_ID, nodeMapping.getApplicationId());
source.put(NodeMappingTable.COLUMN_ADDRESS_ID, nodeMapping.getAddressId());
source.put(NodeMappingTable.COLUMN_ADDRESS, nodeMapping.getAddress());
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, nodeMapping.getTimeBucket());
String sql = SqlBuilder.buildBatchUpdateSql(NodeMappingTable.TABLE, source.keySet(), NodeMappingTable.COLUMN_ID);
entity.setSql(sql);
List<Object> values = new ArrayList<>(source.values());
values.add(nodeMapping.getId());
entity.setParams(values.toArray(new Object[0]));
return entity;
}
}
......@@ -26,9 +26,8 @@ import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.storage.dao.INodeReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
......@@ -39,9 +38,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, NodeReference> {
public class NodeReferenceH2PersistenceDAO extends H2DAO implements INodeReferencePersistenceDAO<H2SqlEntity, H2SqlEntity, NodeReference> {
private final Logger logger = LoggerFactory.getLogger(NodeReferenceH2DAO.class);
private final Logger logger = LoggerFactory.getLogger(NodeReferenceH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
@Override public NodeReference get(String id) {
......
......@@ -20,9 +20,8 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentCostPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
......@@ -33,9 +32,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, SegmentCost> {
public class SegmentCostH2PersistenceDAO extends H2DAO implements ISegmentCostPersistenceDAO<H2SqlEntity, H2SqlEntity, SegmentCost> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostH2DAO.class);
private final Logger logger = LoggerFactory.getLogger(SegmentCostH2PersistenceDAO.class);
@Override public SegmentCost get(String id) {
return null;
......
......@@ -20,9 +20,8 @@ package org.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.ISegmentDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.segment.Segment;
......@@ -33,9 +32,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, Segment> {
public class SegmentH2PersistenceDAO extends H2DAO implements ISegmentPersistenceDAO<H2SqlEntity, H2SqlEntity, Segment> {
private final Logger logger = LoggerFactory.getLogger(SegmentH2DAO.class);
private final Logger logger = LoggerFactory.getLogger(SegmentH2PersistenceDAO.class);
@Override public Segment get(String id) {
return null;
......
......@@ -26,9 +26,8 @@ import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IServiceEntryDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
......@@ -39,9 +38,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceEntry> {
public class ServiceEntryH2PersistenceDAO extends H2DAO implements IServiceEntryPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceEntry> {
private final Logger logger = LoggerFactory.getLogger(ServiceEntryH2DAO.class);
private final Logger logger = LoggerFactory.getLogger(ServiceEntryH2PersistenceDAO.class);
private static final String GET_SERVICE_ENTRY_SQL = "select * from {0} where {1} = ?";
@Override public ServiceEntry get(String id) {
......
......@@ -26,9 +26,8 @@ import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IServiceReferenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
......@@ -39,9 +38,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceReference> {
public class ServiceReferenceH2PersistenceDAO extends H2DAO implements IServiceReferencePersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceReference> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceH2DAO.class);
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册