提交 5219e23d 编写于 作者: clevertension's avatar clevertension

add h2 support

上级 423efc1e
package org.skywalking.apm.collector.agentjvm.worker.cpu.dao;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO {
public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(CpuMetricH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(CpuMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0));
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0));
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("prepare cpu metric batch insert, id: {}", data.getDataString(0));
return source;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentjvm.worker.gc.dao;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO {
public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(GCMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(1));
source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0));
source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1));
source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2));
return source;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class InstanceHeartBeatH2DAO extends H2DAO implements IInstanceHeartBeatDAO {
public class InstanceHeartBeatH2DAO extends H2DAO implements IInstanceHeartBeatDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
H2Client client = getClient();
String sql = "select " + InstanceTable.COLUMN_INSTANCE_ID + "," + InstanceTable.COLUMN_HEARTBEAT_TIME +
" from " + InstanceTable.TABLE + " where " + InstanceTable.COLUMN_INSTANCE_ID + "=?";
Object[] params = new Object[] {id};
ResultSet rs = null;
try {
rs = client.executeQuery(sql, params);
Data data = dataDefine.build(id);
data.setDataInteger(0, rs.getInt(1));
data.setDataLong(0, rs.getLong(2));
return data;
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
} finally {
client.closeResultSet(rs);
}
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getDataLong(0));
return source;
}
}
package org.skywalking.apm.collector.agentjvm.worker.memory.dao;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MemoryMetricH2DAO extends H2DAO implements IMemoryMetricDAO {
public class MemoryMetricH2DAO extends H2DAO implements IMemoryMetricDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0));
source.put(MemoryMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
return source;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentjvm.worker.memorypool.dao;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MemoryPoolMetricH2DAO extends H2DAO implements IMemoryPoolMetricDAO {
public class MemoryPoolMetricH2DAO extends H2DAO implements IMemoryPoolMetricDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0));
source.put(MemoryPoolMetricTable.COLUMN_POOL_TYPE, data.getDataInteger(1));
source.put(MemoryPoolMetricTable.COLUMN_INIT, data.getDataLong(0));
source.put(MemoryPoolMetricTable.COLUMN_MAX, data.getDataLong(1));
source.put(MemoryPoolMetricTable.COLUMN_USED, data.getDataLong(2));
source.put(MemoryPoolMetricTable.COLUMN_COMMITTED, data.getDataLong(3));
source.put(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4));
return source;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentregister.worker.application.dao;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
private final Logger logger = LoggerFactory.getLogger(ApplicationH2DAO.class);
@Override
public int getApplicationId(String applicationCode) {
logger.info("get the application id with application code = {}", applicationCode);
String sql = "select " + ApplicationTable.COLUMN_APPLICATION_ID + " from " +
ApplicationTable.TABLE + " where " + ApplicationTable.COLUMN_APPLICATION_CODE + "='" + applicationCode + "'";
@Override public int getApplicationId(String applicationCode) {
H2Client client = getClient();
return 100;
return getIntValueBySQL(sql);
}
@Override public int getMaxApplicationId() {
return 0;
@Override
public int getMaxApplicationId() {
return getMaxId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
}
@Override public int getMinApplicationId() {
return 0;
@Override
public int getMinApplicationId() {
return getMinId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
}
@Override public void save(ApplicationDataDefine.Application application) {
@Override
public void save(ApplicationDataDefine.Application application) {
String insertSQL = "insert into " + ApplicationTable.TABLE + "(" + ApplicationTable.COLUMN_APPLICATION_ID +
"," + ApplicationTable.COLUMN_APPLICATION_CODE + ") values (?, ?)";
H2Client client = getClient();
Object[] params = new Object[] {application.getApplicationId(), application.getApplicationCode()};
try {
client.execute(insertSQL, params);
} catch (H2ClientException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.agentstream.worker.global.dao;
import org.skywalking.apm.collector.agentstream.worker.instance.performance.dao.InstPerformanceH2DAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author pengys5
*/
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO {
public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
return null;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.instance.performance.dao;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author pengys5
*/
public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO {
public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(InstPerformanceH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
return null;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component.dao;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO {
public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(NodeComponentH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
return null;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO {
public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(NodeComponentH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
return null;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.noderef.dao;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author pengys5
*/
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO {
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(NodeReferenceH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
return null;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao;
import org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.service.ServiceEntryTable;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO {
public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
return null;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author pengys5
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO {
public class SegmentH2DAO extends H2DAO implements ISegmentDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(SegmentCostH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
return null;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.service.entry.dao;
import org.skywalking.apm.collector.agentstream.worker.serviceref.dao.ServiceReferenceH2DAO;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.service.ServiceEntryTable;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO {
public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(ServiceEntryH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
H2Client client = getClient();
String sql = "select * from " + ServiceReferenceTable.TABLE + " where " + ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID + "=?";
Object[] params = new Object[] {id};
ResultSet rs = null;
try {
rs = client.executeQuery(sql, params);
Data data = dataDefine.build(id);
data.setDataInteger(0, rs.getInt(ServiceEntryTable.COLUMN_APPLICATION_ID));
data.setDataInteger(1, rs.getInt(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID));
data.setDataString(1, rs.getString(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME));
data.setDataLong(0, rs.getLong(ServiceEntryTable.COLUMN_REGISTER_TIME));
data.setDataLong(1, rs.getLong(ServiceEntryTable.COLUMN_NEWEST_TIME));
return data;
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
} finally {
client.closeResultSet(rs);
}
return null;
}
@Override public Map<String, Object> prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getDataLong(0));
source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getDataLong(1));
return source;
}
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getDataLong(0));
source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getDataLong(1));
return source;
}
}
package org.skywalking.apm.collector.agentstream.worker.serviceref.dao;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO, IPersistenceDAO<String, String> {
public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO, IPersistenceDAO<Map<String, Object>, Map<String, Object>> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceH2DAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
H2Client client = getClient();
String sql = "select * from " + ServiceReferenceTable.TABLE + " where " + ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID + "=?";
Object[] params = new Object[] {id};
ResultSet rs = null;
try {
rs = client.executeQuery(sql, params);
Data data = dataDefine.build(id);
data.setDataInteger(0, rs.getInt(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID));
data.setDataString(1, rs.getString(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME));
data.setDataInteger(1, rs.getInt(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID));
data.setDataString(2, rs.getString(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME));
data.setDataInteger(2, rs.getInt(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID));
data.setDataString(3, rs.getString(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME));
data.setDataLong(0, rs.getLong(ServiceReferenceTable.COLUMN_S1_LTE));
data.setDataLong(1, rs.getLong(ServiceReferenceTable.COLUMN_S3_LTE));
data.setDataLong(2, rs.getLong(ServiceReferenceTable.COLUMN_S5_LTE));
data.setDataLong(3, rs.getLong(ServiceReferenceTable.COLUMN_S5_GT));
data.setDataLong(4, rs.getLong(ServiceReferenceTable.COLUMN_SUMMARY));
data.setDataLong(5, rs.getLong(ServiceReferenceTable.COLUMN_ERROR));
data.setDataLong(6, rs.getLong(ServiceReferenceTable.COLUMN_COST_SUMMARY));
data.setDataLong(7, rs.getLong(ServiceReferenceTable.COLUMN_TIME_BUCKET));
return data;
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
} finally {
client.closeResultSet(rs);
}
return null;
}
@Override public String prepareBatchInsert(Data data) {
return null;
@Override public Map<String, Object> prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3));
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0));
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1));
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2));
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3));
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4));
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5));
source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getDataLong(6));
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(7));
return source;
}
@Override public String prepareBatchUpdate(Data data) {
return null;
@Override public Map<String, Object> prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0));
source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1));
source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2));
source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3));
source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0));
source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1));
source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2));
source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3));
source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4));
source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5));
source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getDataLong(6));
source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(7));
return source;
}
}
......@@ -4,30 +4,35 @@ cluster:
sessionTimeout: 100000
agent_server:
jetty:
host: localhost
host: 0.0.0.0
port: 10800
context_path: /
agent_stream:
grpc:
host: localhost
host: 192.168.10.13
port: 11800
jetty:
host: localhost
host: 0.0.0.0
port: 12800
context_path: /
ui:
jetty:
host: localhost
host: 0.0.0.0
port: 12800
context_path: /
collector_inside:
grpc:
host: localhost
host: 127.0.0.1
port: 11800
#storage:
# elasticsearch:
# cluster_name: CollectorDBCluster
# cluster_transport_sniffer: true
# cluster_nodes: localhost:9300
# index_shards_number: 2
# index_replicas_number: 0
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
h2:
url: jdbc:h2:~/collector
user_name: sa
password: sa
\ No newline at end of file
package org.skywalking.apm.collector.client.h2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import org.h2.jdbcx.JdbcConnectionPool;
import org.h2.util.IOUtils;
import org.skywalking.apm.collector.core.client.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -16,25 +15,49 @@ public class H2Client implements Client {
private final Logger logger = LoggerFactory.getLogger(H2Client.class);
private JdbcConnectionPool cp;
private Connection conn;
private String url;
private String userName;
private String password;
public H2Client() {
this.url = "jdbc:h2:mem:collector";
this.userName = "";
this.password = "";
}
public H2Client(String url, String userName, String password) {
this.url = url;
this.userName = userName;
this.password = password;
}
@Override public void initialize() throws H2ClientException {
try {
Class.forName("org.h2.Driver");
conn = DriverManager.getConnection("jdbc:h2:mem:collector");
} catch (ClassNotFoundException | SQLException e) {
cp = JdbcConnectionPool.
create(this.url, this.userName, this.password);
conn = cp.getConnection();
} catch (Exception e) {
throw new H2ClientException(e.getMessage(), e);
}
}
@Override public void shutdown() {
if (cp != null) {
cp.dispose();
}
IOUtils.closeSilently(conn);
}
public Connection getConnection() throws H2ClientException {
return conn;
}
public void execute(String sql) throws H2ClientException {
Statement statement = null;
try {
statement = conn.createStatement();
statement = getConnection().createStatement();
statement.execute(sql);
statement.closeOnCompletion();
} catch (SQLException e) {
......@@ -42,17 +65,51 @@ public class H2Client implements Client {
}
}
public void executeQuery(String sql) throws H2ClientException {
Statement statement = null;
public ResultSet executeQuery(String sql, Object[] params) throws H2ClientException {
logger.info("execute query with result: {}", sql);
PreparedStatement statement;
ResultSet rs;
try {
statement = getConnection().prepareStatement(sql);
if (params != null) {
for (int i = 0; i < params.length; i++) {
statement.setObject(i+1, params[i]);
}
}
rs = statement.executeQuery();
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
return rs;
}
public boolean execute(String sql, Object[] params) throws H2ClientException {
logger.info("execute insert/update/delete: {}", sql);
PreparedStatement statement;
boolean flag;
try {
statement = conn.createStatement();
ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
logger.debug(rs.getString("ADDRESS") + "," + rs.getString("DATA"));
statement = getConnection().prepareStatement(sql);
if (params != null) {
for (int i = 0; i < params.length; i++) {
statement.setObject(i+1, params[i]);
}
}
flag = statement.execute();
statement.closeOnCompletion();
} catch (SQLException e) {
throw new H2ClientException(e.getMessage(), e);
}
return flag;
}
public void closeResultSet(ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
}
}
......@@ -60,6 +60,14 @@ public class ZookeeperClient implements Client {
}
}
public void delete(final String path, int version) throws ZookeeperClientException {
try {
zk.delete(path, version);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
public byte[] getData(String path, boolean watch, Stat stat) throws ZookeeperClientException {
try {
return zk.getData(path, watch, stat);
......
......@@ -97,9 +97,15 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
client.getChildren(next.getKey(), true);
String serverPath = next.getKey() + "/" + value.getHostPort();
if (client.exists(serverPath, false) == null) {
Stat stat = client.exists(serverPath, false);
if (stat != null) {
client.delete(serverPath, stat.getVersion());
}
stat = client.exists(serverPath, false);
if (stat == null) {
setData(serverPath, contextPath);
} else {
client.delete(serverPath, stat.getVersion());
throw new ClusterNodeExistException("current address: " + value.getHostPort() + " has been registered, check the host and port configuration or wait a moment.");
}
}
......
package org.skywalking.apm.collector.storage.h2;
public class StorageH2Config {
public static String URL;
public static String USER_NAME;
public static String PASSWORD;
}
......@@ -3,12 +3,25 @@ package org.skywalking.apm.collector.storage.h2;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class StorageH2ConfigParser implements ModuleConfigParser {
private static final String URL = "url";
public static final String USER_NAME = "user_name";
public static final String PASSWORD = "password";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(URL))) {
StorageH2Config.URL = (String)config.get(URL);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(USER_NAME))) {
StorageH2Config.USER_NAME = (String)config.get(USER_NAME);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(PASSWORD))) {
StorageH2Config.PASSWORD = (String)config.get(PASSWORD);
}
}
}
......@@ -33,7 +33,7 @@ public class StorageH2ModuleDefine extends StorageModuleDefine {
}
@Override protected Client createClient() {
return new H2Client();
return new H2Client(StorageH2Config.URL, StorageH2Config.USER_NAME, StorageH2Config.PASSWORD);
}
@Override public StorageInstaller storageInstaller() {
......
package org.skywalking.apm.collector.storage.h2.dao;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.dao.DAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* @author pengys5
*/
public abstract class H2DAO extends DAO<H2Client> {
private final Logger logger = LoggerFactory.getLogger(H2DAO.class);
public final int getMaxId(String tableName, String columnName) {
String sql = "select max(" + columnName + ") from " + tableName;
return getIntValueBySQL(sql);
}
public final int getMinId(String tableName, String columnName) {
String sql = "select min(" + columnName + ") from " + tableName;
return getIntValueBySQL(sql);
}
public final int getIntValueBySQL(String sql) {
H2Client client = getClient();
ResultSet rs = null;
try {
rs = client.executeQuery(sql, null);
if (rs.next()) {
int id = rs.getInt(1);
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
} finally {
client.closeResultSet(rs);
}
return 0;
}
}
package org.skywalking.apm.collector.storage.h2.define;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.h2.util.IOUtils;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.client.Client;
......@@ -8,12 +12,16 @@ import org.skywalking.apm.collector.core.storage.StorageException;
import org.skywalking.apm.collector.core.storage.StorageInstallException;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.core.storage.TableDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class H2StorageInstaller extends StorageInstaller {
private final Logger logger = LoggerFactory.getLogger(H2StorageInstaller.class);
@Override protected void defineFilter(List<TableDefine> tableDefines) {
int size = tableDefines.size();
for (int i = size - 1; i >= 0; i--) {
......@@ -24,11 +32,36 @@ public class H2StorageInstaller extends StorageInstaller {
}
@Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException {
H2Client h2Client = (H2Client)client;
ResultSet rs = null;
try {
logger.info("check if table {} exist ", tableDefine.getName());
rs = h2Client.getConnection().getMetaData().getTables(null, null, tableDefine.getName().toUpperCase(), null);
if (rs.next()) {
return true;
}
} catch (SQLException | H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
} finally {
try {
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
return false;
}
@Override protected boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException {
return false;
H2Client h2Client = (H2Client)client;
try {
h2Client.execute("drop table if exists " + tableDefine.getName());
return true;
} catch (H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
@Override protected boolean createTable(Client client, TableDefine tableDefine) throws StorageException {
......@@ -41,14 +74,16 @@ public class H2StorageInstaller extends StorageInstaller {
h2TableDefine.getColumnDefines().forEach(columnDefine -> {
H2ColumnDefine h2ColumnDefine = (H2ColumnDefine)columnDefine;
if (h2ColumnDefine.getType().equals(H2ColumnDefine.Type.Varchar.name())) {
sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append("(255)");
sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append("(255),");
} else {
sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType());
sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append(",");
}
});
//remove last comma
sqlBuilder.delete(sqlBuilder.length() - 1, sqlBuilder.length());
sqlBuilder.append(")");
try {
logger.info("create h2 table with sql {}", sqlBuilder);
h2Client.execute(sqlBuilder.toString());
} catch (H2ClientException e) {
throw new StorageInstallException(e.getMessage(), e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册