提交 5bd379fd 编写于 作者: P peng-yongsheng

Register workers

上级 d85f9dae
......@@ -52,6 +52,7 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Ap
if (applicationId == 0) {
IApplicationStreamDAO dao = (IApplicationStreamDAO)getDaoService().get(IApplicationStreamDAO.class);
Application newApplication;
int min = dao.getMinApplicationId();
if (min == 0) {
Application userApplication = new Application(String.valueOf(Const.USER_ID));
......@@ -59,16 +60,18 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Ap
userApplication.setApplicationId(Const.USER_ID);
dao.save(userApplication);
application = new Application("-1");
application.setApplicationId(-1);
newApplication = new Application("-1");
newApplication.setApplicationId(-1);
newApplication.setApplicationCode(application.getApplicationCode());
} else {
int max = dao.getMaxApplicationId();
applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application = new Application(String.valueOf(applicationId));
application.setApplicationId(applicationId);
newApplication = new Application(String.valueOf(applicationId));
newApplication.setApplicationId(applicationId);
newApplication.setApplicationCode(application.getApplicationCode());
}
dao.save(application);
dao.save(newApplication);
}
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker<Instance, Instance> {
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterRemoteWorker.class);
@Override public int id() {
return 0;
}
public InstanceRegisterRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override protected void onWork(Instance instance) throws WorkerException {
logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
onNext(instance);
}
public static class Factory extends AbstractRemoteWorkerProvider<Instance, Instance, InstanceRegisterRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override
public InstanceRegisterRemoteWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new InstanceRegisterRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.dao.IInstanceStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker<Instance, Instance> {
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterSerialWorker.class);
public InstanceRegisterSerialWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(Instance instance) throws WorkerException {
logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
int instanceId = getCacheServiceManager().getInstanceCacheService().getInstanceId(instance.getApplicationId(), instance.getAgentUUID());
if (instanceId == 0) {
IInstanceStreamDAO dao = (IInstanceStreamDAO)getDaoService().get(IInstanceStreamDAO.class);
Instance newInstance;
int min = dao.getMinInstanceId();
int max = dao.getMaxInstanceId();
if (min == 0 && max == 0) {
newInstance = new Instance("1");
newInstance.setInstanceId(1);
newInstance.setApplicationId(instance.getApplicationId());
newInstance.setAgentUUID(instance.getAgentUUID());
newInstance.setHeartBeatTime(instance.getHeartBeatTime());
newInstance.setOsInfo(instance.getOsInfo());
newInstance.setRegisterTime(instance.getRegisterTime());
} else {
newInstance = new Instance(String.valueOf(max + 1));
newInstance.setInstanceId(max + 1);
newInstance.setApplicationId(instance.getApplicationId());
newInstance.setAgentUUID(instance.getAgentUUID());
newInstance.setHeartBeatTime(instance.getHeartBeatTime());
newInstance.setOsInfo(instance.getOsInfo());
newInstance.setRegisterTime(instance.getRegisterTime());
}
dao.save(newInstance);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<Instance, Instance, InstanceRegisterSerialWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<Instance> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override
public InstanceRegisterSerialWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new InstanceRegisterSerialWorker(getDaoService(), getCacheServiceManager());
}
@Override public int queueSize() {
return 256;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.remote.service.RemoteClientService;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker<ServiceName, ServiceName> {
private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterRemoteWorker.class);
public ServiceNameRegisterRemoteWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(ServiceName serviceName) throws WorkerException {
onNext(serviceName);
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceName, ServiceName, ServiceNameRegisterRemoteWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
RemoteClientService remoteClientService) {
super(daoService, cacheServiceManager, remoteClientService);
}
@Override public ServiceNameRegisterRemoteWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ServiceNameRegisterRemoteWorker(getDaoService(), getCacheServiceManager());
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.register;
import org.skywalking.apm.collector.agent.stream.IdAutoIncrement;
import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.dao.IServiceNameStreamDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<ServiceName, ServiceName> {
private final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterSerialWorker.class);
public ServiceNameRegisterSerialWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager);
}
@Override public int id() {
return 0;
}
@Override protected void onWork(ServiceName serviceName) throws WorkerException {
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
int serviceId = getCacheServiceManager().getServiceIdCacheService().get(serviceName.getApplicationId(), serviceName.getServiceName());
if (serviceId == 0) {
IServiceNameStreamDAO dao = (IServiceNameStreamDAO)getDaoService().get(IServiceNameStreamDAO.class);
ServiceName newServiceName;
int min = dao.getMinServiceId();
if (min == 0) {
ServiceName noneServiceName = new ServiceName("1");
noneServiceName.setApplicationId(0);
noneServiceName.setServiceId(Const.NONE_SERVICE_ID);
noneServiceName.setServiceName(Const.NONE_SERVICE_NAME);
dao.save(noneServiceName);
newServiceName = new ServiceName("-1");
newServiceName.setApplicationId(serviceName.getApplicationId());
newServiceName.setServiceId(-1);
newServiceName.setServiceName(serviceName.getServiceName());
} else {
int max = dao.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
newServiceName = new ServiceName(String.valueOf(serviceId));
newServiceName.setApplicationId(serviceName.getApplicationId());
newServiceName.setServiceId(serviceId);
newServiceName.setServiceName(serviceName.getServiceName());
}
dao.save(newServiceName);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceName, ServiceName, ServiceNameRegisterSerialWorker> {
public Factory(DAOService daoService, CacheServiceManager cacheServiceManager,
QueueCreatorService<ServiceName> queueCreatorService) {
super(daoService, cacheServiceManager, queueCreatorService);
}
@Override public ServiceNameRegisterSerialWorker workerInstance(DAOService daoService,
CacheServiceManager cacheServiceManager) {
return new ServiceNameRegisterSerialWorker(getDaoService(), getCacheServiceManager());
}
@Override public int queueSize() {
return 256;
}
}
}
......@@ -25,4 +25,6 @@ import org.skywalking.apm.collector.core.module.Service;
*/
public interface InstanceCacheService extends Service {
int get(int applicationInstanceId);
int getInstanceId(int applicationId, String agentUUID);
}
......@@ -21,6 +21,7 @@ package org.skywalking.apm.collector.cache.guava.service;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.slf4j.Logger;
......@@ -35,6 +36,8 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
private final Cache<Integer, Integer> integerCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final Cache<String, Integer> stringCache = CacheBuilder.newBuilder().initialCapacity(100).maximumSize(5000).build();
private final DAOService daoService;
public InstanceCacheGuavaService(DAOService daoService) {
......@@ -59,4 +62,24 @@ public class InstanceCacheGuavaService implements InstanceCacheService {
}
return applicationId;
}
@Override public int getInstanceId(int applicationId, String agentUUID) {
IInstanceCacheDAO dao = (IInstanceCacheDAO)daoService.get(IInstanceCacheDAO.class);
String key = applicationId + Const.ID_SPLIT + agentUUID;
int instanceId = 0;
try {
instanceId = stringCache.get(key, () -> dao.getInstanceId(applicationId, agentUUID));
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
if (instanceId == 0) {
instanceId = dao.getInstanceId(applicationId, agentUUID);
if (applicationId != 0) {
stringCache.put(key, instanceId);
}
}
return instanceId;
}
}
......@@ -24,5 +24,7 @@ import org.skywalking.apm.collector.storage.base.dao.DAO;
* @author peng-yongsheng
*/
public interface IInstanceCacheDAO extends DAO {
int getApplicationId(int applicationInstanceId);
int getApplicationId(int instanceId);
int getInstanceId(int applicationId, String agentUUID);
}
......@@ -18,14 +18,13 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.storage.base.dao.DAO;
import org.skywalking.apm.collector.storage.table.register.Instance;
/**
* @author peng-yongsheng
*/
public interface IInstanceStreamDAO {
int getInstanceId(int applicationId, String agentUUID);
public interface IInstanceStreamDAO extends DAO {
int getMaxInstanceId();
int getMinInstanceId();
......
......@@ -18,12 +18,13 @@
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.storage.base.dao.DAO;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
*/
public interface IServiceNameDAO {
public interface IServiceNameStreamDAO extends DAO {
int getMaxServiceId();
int getMinServiceId();
......
......@@ -51,11 +51,23 @@ public class ServiceName extends Data {
return getDataString(1);
}
public void setServiceName(String serviceName) {
setDataString(1, serviceName);
}
public int getApplicationId() {
return getDataInteger(0);
}
public void setApplicationId(int applicationId) {
setDataInteger(0, applicationId);
}
public int getServiceId() {
return getDataInteger(1);
}
public void setServiceId(int serviceId) {
setDataInteger(1, serviceId);
}
}
......@@ -19,6 +19,13 @@
package org.skywalking.apm.collector.storage.es.dao;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
......@@ -32,12 +39,32 @@ public class InstanceEsCacheDAO extends EsDAO implements IInstanceCacheDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceEsCacheDAO.class);
@Override public int getApplicationId(int applicationInstanceId) {
GetResponse response = getClient().prepareGet(InstanceTable.TABLE, String.valueOf(applicationInstanceId)).get();
@Override public int getApplicationId(int instanceId) {
GetResponse response = getClient().prepareGet(InstanceTable.TABLE, String.valueOf(instanceId)).get();
if (response.isExists()) {
return (int)response.getSource().get(InstanceTable.COLUMN_APPLICATION_ID);
} else {
return 0;
}
}
@Override public int getInstanceId(int applicationId, String agentUUID) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(InstanceTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENT_UUID, agentUUID));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
return (int)searchHit.getSource().get(InstanceTable.COLUMN_INSTANCE_ID);
}
return 0;
}
}
......@@ -21,14 +21,8 @@ package org.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IInstanceStreamDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
......@@ -44,26 +38,6 @@ public class InstanceEsStreamDAO extends EsDAO implements IInstanceStreamDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceEsStreamDAO.class);
@Override public int getInstanceId(int applicationId, String agentUUID) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(InstanceTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENT_UUID, agentUUID));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
return (int)searchHit.getSource().get(InstanceTable.COLUMN_INSTANCE_ID);
}
return 0;
}
@Override public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
......
......@@ -23,7 +23,7 @@ import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameStreamDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
......@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameEsDAO extends EsDAO implements IServiceNameDAO {
public class ServiceNameEsStreamDAO extends EsDAO implements IServiceNameStreamDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceNameEsDAO.class);
private final Logger logger = LoggerFactory.getLogger(ServiceNameEsStreamDAO.class);
@Override public int getMaxServiceId() {
return getMaxId(ServiceNameTable.TABLE, ServiceNameTable.COLUMN_SERVICE_ID);
......
......@@ -22,9 +22,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,12 +37,13 @@ public class InstanceH2CacheDAO extends H2DAO implements IInstanceCacheDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceH2CacheDAO.class);
private static final String GET_APPLICATION_ID_SQL = "select {0} from {1} where {2} = ?";
private static final String GET_INSTANCE_ID_SQL = "select {0} from {1} where {2} = ? and {3} = ?";
@Override public int getApplicationId(int applicationInstanceId) {
logger.info("get the application getId with application getId = {}", applicationInstanceId);
@Override public int getApplicationId(int instanceId) {
logger.info("get the application getId with application getId = {}", instanceId);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_APPLICATION_ID_SQL, InstanceTable.COLUMN_APPLICATION_ID, InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
Object[] params = new Object[] {applicationInstanceId};
Object[] params = new Object[] {instanceId};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
return rs.getInt(InstanceTable.COLUMN_APPLICATION_ID);
......@@ -52,4 +53,20 @@ public class InstanceH2CacheDAO extends H2DAO implements IInstanceCacheDAO {
}
return 0;
}
@Override public int getInstanceId(int applicationId, String agentUUID) {
logger.info("get the application getId with application getId = {}, agentUUID = {}", applicationId, agentUUID);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_INSTANCE_ID_SQL, InstanceTable.COLUMN_INSTANCE_ID, InstanceTable.TABLE, InstanceTable.COLUMN_APPLICATION_ID,
InstanceTable.COLUMN_AGENT_UUID);
Object[] params = new Object[] {applicationId, agentUUID};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
return rs.getInt(InstanceTable.COLUMN_INSTANCE_ID);
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return 0;
}
}
......@@ -18,8 +18,6 @@
package org.skywalking.apm.collector.storage.h2.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
......@@ -38,25 +36,8 @@ import org.slf4j.LoggerFactory;
public class InstanceH2StreamDAO extends H2DAO implements IInstanceStreamDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceH2StreamDAO.class);
private static final String GET_INSTANCE_ID_SQL = "select {0} from {1} where {2} = ? and {3} = ?";
private static final String UPDATE_HEARTBEAT_TIME_SQL = "update {0} set {1} = ? where {2} = ?";
@Override public int getInstanceId(int applicationId, String agentUUID) {
logger.info("get the application getId with application getId = {}, agentUUID = {}", applicationId, agentUUID);
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_INSTANCE_ID_SQL, InstanceTable.COLUMN_INSTANCE_ID, InstanceTable.TABLE, InstanceTable.COLUMN_APPLICATION_ID,
InstanceTable.COLUMN_AGENT_UUID);
Object[] params = new Object[] {applicationId, agentUUID};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
return rs.getInt(InstanceTable.COLUMN_INSTANCE_ID);
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return 0;
}
@Override public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
......
......@@ -23,7 +23,7 @@ import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameStreamDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
......@@ -33,8 +33,8 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class ServiceNameH2DAO extends H2DAO implements IServiceNameDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceNameH2DAO.class);
public class ServiceNameH2StreamDAO extends H2DAO implements IServiceNameStreamDAO {
private final Logger logger = LoggerFactory.getLogger(ServiceNameH2StreamDAO.class);
@Override
public int getMaxServiceId() {
......
......@@ -41,11 +41,11 @@ public abstract class AbstractWorker<INPUT extends Data, OUTPUT extends Data> im
this.cacheServiceManager = cacheServiceManager;
}
public DAOService getDaoService() {
public final DAOService getDaoService() {
return daoService;
}
public CacheServiceManager getCacheServiceManager() {
public final CacheServiceManager getCacheServiceManager() {
return cacheServiceManager;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册