提交 e4f56ab1 编写于 作者: P pengys5

add Dao container to cache Dao

上级 5bd5a69b
package org.skywalking.apm.collector.agentregister.application;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
......@@ -10,6 +15,17 @@ public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
@Override public int getApplicationId(String applicationCode) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationRegisterTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ApplicationRegisterTable.COLUMN_APPLICATION_CODE, applicationCode));
searchRequestBuilder.setSize(10);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
return searchResponse.getHits().getAt(0).getField(ApplicationRegisterTable.COLUMN_APPLICATION_ID).getValue();
}
return 0;
}
}
......@@ -10,6 +10,6 @@ public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
@Override public int getApplicationId(String applicationCode) {
H2Client client = getClient();
return 0;
return 100;
}
}
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
/**
* @author pengys5
*/
public class ApplicationIDGetOrCreate {
private IApplicationDAO applicationDAO;
public int getOrCreate(String applicationCode) {
return 0;
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationId(applicationCode);
}
}
......@@ -8,22 +8,27 @@ import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithIntegerValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationRegisterServiceHandler extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);
private ApplicationIDGetOrCreate applicationIDGetOrCreate = new ApplicationIDGetOrCreate();
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
logger.debug("register application");
ProtocolStringList applicationCodes = request.getApplicationCodeList();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = applicationIDGetOrCreate.getOrCreate(applicationCode);
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
ApplicationMapping mapping = ApplicationMapping.newBuilder().setApplication(i, value).build();
ApplicationMapping mapping = ApplicationMapping.newBuilder().addApplication(i, value).build();
responseObserver.onNext(mapping);
}
responseObserver.onCompleted();
......
org.skywalking.apm.collector.agentregister.application.ApplicationEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentregister.application.ApplicationH2DAO
\ No newline at end of file
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.Test;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
/**
* @author pengys5
*/
public class ApplicationRegisterServiceHandlerTestCase {
private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub;
@Test
public void testRegister() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode("test").build();
ApplicationMapping mapping = stub.register(application);
System.out.println(mapping.getApplication(0).getKey() + ", " + mapping.getApplication(0).getValue());
}
}
......@@ -8,6 +8,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
......@@ -101,6 +102,10 @@ public class ElasticSearchClient implements Client {
return response.isExists();
}
public SearchRequestBuilder prepareSearch(String indexName) {
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName) {
return null;
}
......
package org.skywalking.apm.collector.core.config;
import org.junit.Test;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleDefineException;
/**
* @author pengys5
*/
public class ModuleConfigLoaderTestCase {
@Test
public void testLoad() throws ModuleDefineException {
ModuleConfigLoader loader = new ModuleConfigLoader();
loader.load();
}
}
......@@ -37,12 +37,12 @@ public class GRPCServer implements Server {
@Override public void initialize() throws ServerException {
InetSocketAddress address = new InetSocketAddress(host, port);
nettyServerBuilder = NettyServerBuilder.forAddress(address);
server = nettyServerBuilder.build();
logger.info("Server started, host {} listening on {}", host, port);
}
@Override public void start() throws ServerException {
try {
server = nettyServerBuilder.build();
server.start();
} catch (IOException e) {
throw new GRPCServerException(e.getMessage(), e);
......
package org.skywalking.apm.collector.storage.dao;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public enum DAOContainer {
INSTANCE;
private Map<String, DAO> daos = new HashMap<>();
public void put(String interfaceName, DAO dao) {
daos.put(interfaceName, dao);
}
public DAO get(String interfaceName) {
return daos.get(interfaceName);
}
}
......@@ -9,6 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.storage.StorageModuleDefine;
import org.skywalking.apm.collector.storage.StorageModuleGroupDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAODefineLoader;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchStorageInstaller;
......@@ -45,6 +46,8 @@ public class StorageElasticSearchModuleDefine extends StorageModuleDefine {
List<EsDAO> esDAOs = loader.load();
esDAOs.forEach(esDAO -> {
esDAO.setClient((ElasticSearchClient)client);
String interFaceName = esDAO.getClass().getInterfaces()[0].getName();
DAOContainer.INSTANCE.put(interFaceName, esDAO);
});
}
}
......@@ -9,6 +9,7 @@ import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.storage.StorageModuleDefine;
import org.skywalking.apm.collector.storage.StorageModuleGroupDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.dao.H2DAODefineLoader;
import org.skywalking.apm.collector.storage.h2.define.H2StorageInstaller;
......@@ -45,6 +46,8 @@ public class StorageH2ModuleDefine extends StorageModuleDefine {
List<H2DAO> h2DAOs = loader.load();
h2DAOs.forEach(h2DAO -> {
h2DAO.setClient((H2Client)client);
String interFaceName = h2DAO.getClass().getInterfaces()[0].getName();
DAOContainer.INSTANCE.put(interFaceName, h2DAO);
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册