diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/dao/CpuMetricH2DAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/dao/CpuMetricH2DAO.java index cdd08577d6f3ea2478bf4789ec3968a6a17db46a..065f4c9184d7a1831043e84e80fea7c8b7cbb4dc 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/dao/CpuMetricH2DAO.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/cpu/dao/CpuMetricH2DAO.java @@ -1,9 +1,37 @@ package org.skywalking.apm.collector.agentjvm.worker.cpu.dao; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; +import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable; +import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO { +public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(CpuMetricH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + + @Override public Map prepareBatchInsert(Data data) { + Map source = new HashMap<>(); + source.put(CpuMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0)); + source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0)); + source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0)); + + logger.debug("prepare cpu metric batch insert, id: {}", data.getDataString(0)); + return source; + } + + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/dao/GCMetricH2DAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/dao/GCMetricH2DAO.java index c1d5f1e70fd7b5f66e455b9935e4dab3587d3f4b..b58a401588bbe5885edd83ca3874f82692d29e01 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/dao/GCMetricH2DAO.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/gc/dao/GCMetricH2DAO.java @@ -1,9 +1,35 @@ package org.skywalking.apm.collector.agentjvm.worker.gc.dao; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; +import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable; +import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; + +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO { +public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO, IPersistenceDAO, Map> { + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + + @Override public Map prepareBatchInsert(Data data) { + Map source = new HashMap<>(); + source.put(GCMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0)); + source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(1)); + source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0)); + source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1)); + source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2)); + + return source; + } + + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java index 2a63de395f6abc6c11f3747253aff286213259ca..fc10e863e58a527a21b96b88c8fab86e2395feab 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/heartbeat/dao/InstanceHeartBeatH2DAO.java @@ -1,9 +1,57 @@ package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; +import org.skywalking.apm.collector.core.framework.UnexpectedException; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; +import org.skywalking.apm.collector.storage.define.register.InstanceTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class InstanceHeartBeatH2DAO extends H2DAO implements IInstanceHeartBeatDAO { +public class InstanceHeartBeatH2DAO extends H2DAO implements IInstanceHeartBeatDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsDAO.class); + + @Override public Data get(String id, DataDefine dataDefine) { + H2Client client = getClient(); + String sql = "select " + InstanceTable.COLUMN_INSTANCE_ID + "," + InstanceTable.COLUMN_HEARTBEAT_TIME + + " from " + InstanceTable.TABLE + " where " + InstanceTable.COLUMN_INSTANCE_ID + "=?"; + Object[] params = new Object[] {id}; + ResultSet rs = null; + try { + rs = client.executeQuery(sql, params); + Data data = dataDefine.build(id); + data.setDataInteger(0, rs.getInt(1)); + data.setDataLong(0, rs.getLong(2)); + return data; + } catch (SQLException | H2ClientException e) { + logger.error(e.getMessage(), e); + } finally { + client.closeResultSet(rs); + } + return null; + } + + @Override public Map prepareBatchInsert(Data data) { + throw new UnexpectedException("There is no need to merge stream data with database data."); + } + + @Override public Map prepareBatchUpdate(Data data) { + Map source = new HashMap<>(); + source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getDataLong(0)); + return source; + } } diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/dao/MemoryMetricH2DAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/dao/MemoryMetricH2DAO.java index 16f6d407b6324d8282d2180cca8ef789a62947ec..3afaeaac83a62369be0dcfcf8b1c4276c325acd6 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/dao/MemoryMetricH2DAO.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memory/dao/MemoryMetricH2DAO.java @@ -1,9 +1,36 @@ package org.skywalking.apm.collector.agentjvm.worker.memory.dao; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; +import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; + +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class MemoryMetricH2DAO extends H2DAO implements IMemoryMetricDAO { +public class MemoryMetricH2DAO extends H2DAO implements IMemoryMetricDAO, IPersistenceDAO, Map> { + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + + @Override public Map prepareBatchInsert(Data data) { + Map source = new HashMap<>(); + source.put(MemoryMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0)); + source.put(MemoryMetricTable.COLUMN_IS_HEAP, data.getDataBoolean(0)); + source.put(MemoryMetricTable.COLUMN_INIT, data.getDataLong(0)); + source.put(MemoryMetricTable.COLUMN_MAX, data.getDataLong(1)); + source.put(MemoryMetricTable.COLUMN_USED, data.getDataLong(2)); + source.put(MemoryMetricTable.COLUMN_COMMITTED, data.getDataLong(3)); + source.put(MemoryMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4)); + + return source; + } + + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/dao/MemoryPoolMetricH2DAO.java b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/dao/MemoryPoolMetricH2DAO.java index 200f9bb160f7b05f7936cbbb4cade797f7473e70..7cbd236bb074f6b7fe2e74551980e9545eeb992f 100644 --- a/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/dao/MemoryPoolMetricH2DAO.java +++ b/apm-collector/apm-collector-agentjvm/src/main/java/org/skywalking/apm/collector/agentjvm/worker/memorypool/dao/MemoryPoolMetricH2DAO.java @@ -1,9 +1,36 @@ package org.skywalking.apm.collector.agentjvm.worker.memorypool.dao; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; +import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; + +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class MemoryPoolMetricH2DAO extends H2DAO implements IMemoryPoolMetricDAO { +public class MemoryPoolMetricH2DAO extends H2DAO implements IMemoryPoolMetricDAO, IPersistenceDAO, Map> { + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + + @Override public Map prepareBatchInsert(Data data) { + Map source = new HashMap<>(); + source.put(MemoryPoolMetricTable.COLUMN_INSTANCE_ID, data.getDataInteger(0)); + source.put(MemoryPoolMetricTable.COLUMN_POOL_TYPE, data.getDataInteger(1)); + source.put(MemoryPoolMetricTable.COLUMN_INIT, data.getDataLong(0)); + source.put(MemoryPoolMetricTable.COLUMN_MAX, data.getDataLong(1)); + source.put(MemoryPoolMetricTable.COLUMN_USED, data.getDataLong(2)); + source.put(MemoryPoolMetricTable.COLUMN_COMMITTED, data.getDataLong(3)); + source.put(MemoryPoolMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(4)); + + return source; + } + + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/dao/ApplicationH2DAO.java b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/dao/ApplicationH2DAO.java index 0081c029492c69e54dd2ee66f84d8bcf3c42bb56..53337184ffb1bf2fd386dc24c535791874826c92 100644 --- a/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/dao/ApplicationH2DAO.java +++ b/apm-collector/apm-collector-agentregister/src/main/java/org/skywalking/apm/collector/agentregister/worker/application/dao/ApplicationH2DAO.java @@ -1,28 +1,47 @@ package org.skywalking.apm.collector.agentregister.worker.application.dao; +import org.skywalking.apm.collector.client.h2.H2ClientException; import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine; import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.storage.define.register.ApplicationTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class ApplicationH2DAO extends H2DAO implements IApplicationDAO { + private final Logger logger = LoggerFactory.getLogger(ApplicationH2DAO.class); + @Override + public int getApplicationId(String applicationCode) { + logger.info("get the application id with application code = {}", applicationCode); + String sql = "select " + ApplicationTable.COLUMN_APPLICATION_ID + " from " + + ApplicationTable.TABLE + " where " + ApplicationTable.COLUMN_APPLICATION_CODE + "='" + applicationCode + "'"; - @Override public int getApplicationId(String applicationCode) { - H2Client client = getClient(); - return 100; + return getIntValueBySQL(sql); } - @Override public int getMaxApplicationId() { - return 0; + @Override + public int getMaxApplicationId() { + return getMaxId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID); } - @Override public int getMinApplicationId() { - return 0; + @Override + public int getMinApplicationId() { + return getMinId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID); } - @Override public void save(ApplicationDataDefine.Application application) { - + @Override + public void save(ApplicationDataDefine.Application application) { + String insertSQL = "insert into " + ApplicationTable.TABLE + "(" + ApplicationTable.COLUMN_APPLICATION_ID + + "," + ApplicationTable.COLUMN_APPLICATION_CODE + ") values (?, ?)"; + H2Client client = getClient(); + Object[] params = new Object[] {application.getApplicationId(), application.getApplicationCode()}; + try { + client.execute(insertSQL, params); + } catch (H2ClientException e) { + logger.error(e.getMessage(), e); + } } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/dao/GlobalTraceH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/dao/GlobalTraceH2DAO.java index ea48e612f3c99e055f47be76676f643ec0a86e3e..d17002aa0aa3a76f0a4c2e1a433d5650d609758e 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/dao/GlobalTraceH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/global/dao/GlobalTraceH2DAO.java @@ -1,9 +1,27 @@ package org.skywalking.apm.collector.agentstream.worker.global.dao; +import org.skywalking.apm.collector.agentstream.worker.instance.performance.dao.InstPerformanceH2DAO; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * @author pengys5 */ -public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO { +public class GlobalTraceH2DAO extends H2DAO implements IGlobalTraceDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(GlobalTraceH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + @Override public Map prepareBatchInsert(Data data) { + return null; + } + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/dao/InstPerformanceH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/dao/InstPerformanceH2DAO.java index 9dbe1800c8e7e17bac2203beaba3b3e463885919..bb7b9fb040f647ca2f9e36abcfcae976e79295ce 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/dao/InstPerformanceH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/instance/performance/dao/InstPerformanceH2DAO.java @@ -1,10 +1,27 @@ package org.skywalking.apm.collector.agentstream.worker.instance.performance.dao; +import org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * @author pengys5 */ -public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO { - +public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(InstPerformanceH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + @Override public Map prepareBatchInsert(Data data) { + return null; + } + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java index 5e08b344508fa1a1e09627c38b6c88fe541d54e8..870f656e66855374d7a0bf271bda08276abc0667 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/component/dao/NodeComponentH2DAO.java @@ -1,10 +1,26 @@ package org.skywalking.apm.collector.agentstream.worker.node.component.dao; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * @author pengys5 */ -public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO { - +public class NodeComponentH2DAO extends H2DAO implements INodeComponentDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(NodeComponentH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + @Override public Map prepareBatchInsert(Data data) { + return null; + } + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/dao/NodeMappingH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/dao/NodeMappingH2DAO.java index 045d0738e6f7391b8937505c20721d6716e14fa4..e5cb8709a886f5a9fa25dcbfbc1cdb73e1ddcd10 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/dao/NodeMappingH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/mapping/dao/NodeMappingH2DAO.java @@ -1,9 +1,27 @@ package org.skywalking.apm.collector.agentstream.worker.node.mapping.dao; +import org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * @author pengys5 */ -public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO { +public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(NodeComponentH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + @Override public Map prepareBatchInsert(Data data) { + return null; + } + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/dao/NodeReferenceH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/dao/NodeReferenceH2DAO.java index 30a206c3ec50434373644963495797889a44c347..7fe20ff63863fb3510fd394ef370739eae87d314 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/dao/NodeReferenceH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/noderef/dao/NodeReferenceH2DAO.java @@ -1,9 +1,27 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.dao; +import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * @author pengys5 */ -public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO { +public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(NodeReferenceH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + @Override public Map prepareBatchInsert(Data data) { + return null; + } + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/dao/SegmentCostH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/dao/SegmentCostH2DAO.java index 63a6c78c54f1a047ee818b0d2ba5d0078bb91c58..1fb2e71810aec5e7a8b9f1f51525849247330a98 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/dao/SegmentCostH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/cost/dao/SegmentCostH2DAO.java @@ -1,10 +1,34 @@ package org.skywalking.apm.collector.agentstream.worker.segment.cost.dao; +import org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; +import org.skywalking.apm.collector.storage.define.service.ServiceEntryTable; +import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO { - +public class SegmentCostH2DAO extends H2DAO implements ISegmentCostDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(SegmentCostH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + @Override public Map prepareBatchInsert(Data data) { + return null; + } + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/dao/SegmentH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/dao/SegmentH2DAO.java index 363088b87d633e01ae46fb038078f3534c65bd98..37054c67bf55cba2f434381e29d0a58b077cef3a 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/dao/SegmentH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/segment/origin/dao/SegmentH2DAO.java @@ -1,9 +1,27 @@ package org.skywalking.apm.collector.agentstream.worker.segment.origin.dao; +import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; /** * @author pengys5 */ -public class SegmentH2DAO extends H2DAO implements ISegmentDAO { +public class SegmentH2DAO extends H2DAO implements ISegmentDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(SegmentCostH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + return null; + } + @Override public Map prepareBatchInsert(Data data) { + return null; + } + @Override public Map prepareBatchUpdate(Data data) { + return null; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/dao/ServiceEntryH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/dao/ServiceEntryH2DAO.java index fb2cd96c7a3c93371d22b9239a3c3b73a4fd096c..a2650151b356c9bb31b78b86822991e0dd79f210 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/dao/ServiceEntryH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/service/entry/dao/ServiceEntryH2DAO.java @@ -1,9 +1,64 @@ package org.skywalking.apm.collector.agentstream.worker.service.entry.dao; +import org.skywalking.apm.collector.agentstream.worker.serviceref.dao.ServiceReferenceH2DAO; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; +import org.skywalking.apm.collector.core.stream.Data; +import org.skywalking.apm.collector.storage.define.DataDefine; +import org.skywalking.apm.collector.storage.define.service.ServiceEntryTable; +import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; +import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO { +public class ServiceEntryH2DAO extends H2DAO implements IServiceEntryDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(ServiceEntryH2DAO.class); + @Override public Data get(String id, DataDefine dataDefine) { + H2Client client = getClient(); + String sql = "select * from " + ServiceReferenceTable.TABLE + " where " + ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID + "=?"; + Object[] params = new Object[] {id}; + ResultSet rs = null; + try { + rs = client.executeQuery(sql, params); + Data data = dataDefine.build(id); + data.setDataInteger(0, rs.getInt(ServiceEntryTable.COLUMN_APPLICATION_ID)); + data.setDataInteger(1, rs.getInt(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID)); + data.setDataString(1, rs.getString(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME)); + data.setDataLong(0, rs.getLong(ServiceEntryTable.COLUMN_REGISTER_TIME)); + data.setDataLong(1, rs.getLong(ServiceEntryTable.COLUMN_NEWEST_TIME)); + return data; + } catch (SQLException | H2ClientException e) { + logger.error(e.getMessage(), e); + } finally { + client.closeResultSet(rs); + } + return null; + } + @Override public Map prepareBatchInsert(Data data) { + Map source = new HashMap<>(); + source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0)); + source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1)); + source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1)); + source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getDataLong(0)); + source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getDataLong(1)); + return source; + } + @Override public Map prepareBatchUpdate(Data data) { + Map source = new HashMap<>(); + source.put(ServiceEntryTable.COLUMN_APPLICATION_ID, data.getDataInteger(0)); + source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(1)); + source.put(ServiceEntryTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1)); + source.put(ServiceEntryTable.COLUMN_REGISTER_TIME, data.getDataLong(0)); + source.put(ServiceEntryTable.COLUMN_NEWEST_TIME, data.getDataLong(1)); + return source; + } } diff --git a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/dao/ServiceReferenceH2DAO.java b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/dao/ServiceReferenceH2DAO.java index 33d2957de5563501f5c66c46e6c1349ad7abcccb..6b17bf8ec3c4885020cbb4676254d05e3caab29c 100644 --- a/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/dao/ServiceReferenceH2DAO.java +++ b/apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/serviceref/dao/ServiceReferenceH2DAO.java @@ -1,24 +1,94 @@ package org.skywalking.apm.collector.agentstream.worker.serviceref.dao; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; +import org.skywalking.apm.collector.storage.define.register.InstanceTable; +import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable; import org.skywalking.apm.collector.storage.h2.dao.H2DAO; import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO; import org.skywalking.apm.collector.core.stream.Data; import org.skywalking.apm.collector.storage.define.DataDefine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; /** * @author pengys5 */ -public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO, IPersistenceDAO { - +public class ServiceReferenceH2DAO extends H2DAO implements IServiceReferenceDAO, IPersistenceDAO, Map> { + private final Logger logger = LoggerFactory.getLogger(ServiceReferenceH2DAO.class); @Override public Data get(String id, DataDefine dataDefine) { + H2Client client = getClient(); + String sql = "select * from " + ServiceReferenceTable.TABLE + " where " + ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID + "=?"; + Object[] params = new Object[] {id}; + ResultSet rs = null; + try { + rs = client.executeQuery(sql, params); + Data data = dataDefine.build(id); + data.setDataInteger(0, rs.getInt(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID)); + data.setDataString(1, rs.getString(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME)); + data.setDataInteger(1, rs.getInt(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)); + data.setDataString(2, rs.getString(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME)); + data.setDataInteger(2, rs.getInt(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)); + data.setDataString(3, rs.getString(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME)); + data.setDataLong(0, rs.getLong(ServiceReferenceTable.COLUMN_S1_LTE)); + data.setDataLong(1, rs.getLong(ServiceReferenceTable.COLUMN_S3_LTE)); + data.setDataLong(2, rs.getLong(ServiceReferenceTable.COLUMN_S5_LTE)); + data.setDataLong(3, rs.getLong(ServiceReferenceTable.COLUMN_S5_GT)); + data.setDataLong(4, rs.getLong(ServiceReferenceTable.COLUMN_SUMMARY)); + data.setDataLong(5, rs.getLong(ServiceReferenceTable.COLUMN_ERROR)); + data.setDataLong(6, rs.getLong(ServiceReferenceTable.COLUMN_COST_SUMMARY)); + data.setDataLong(7, rs.getLong(ServiceReferenceTable.COLUMN_TIME_BUCKET)); + return data; + } catch (SQLException | H2ClientException e) { + logger.error(e.getMessage(), e); + } finally { + client.closeResultSet(rs); + } return null; } - @Override public String prepareBatchInsert(Data data) { - return null; + @Override public Map prepareBatchInsert(Data data) { + Map source = new HashMap<>(); + source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0)); + source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1)); + source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1)); + source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2)); + source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2)); + source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3)); + source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0)); + source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1)); + source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2)); + source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3)); + source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4)); + source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5)); + source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getDataLong(6)); + source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(7)); + + return source; } - @Override public String prepareBatchUpdate(Data data) { - return null; + @Override public Map prepareBatchUpdate(Data data) { + Map source = new HashMap<>(); + source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, data.getDataInteger(0)); + source.put(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_NAME, data.getDataString(1)); + source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, data.getDataInteger(1)); + source.put(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME, data.getDataString(2)); + source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, data.getDataInteger(2)); + source.put(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME, data.getDataString(3)); + source.put(ServiceReferenceTable.COLUMN_S1_LTE, data.getDataLong(0)); + source.put(ServiceReferenceTable.COLUMN_S3_LTE, data.getDataLong(1)); + source.put(ServiceReferenceTable.COLUMN_S5_LTE, data.getDataLong(2)); + source.put(ServiceReferenceTable.COLUMN_S5_GT, data.getDataLong(3)); + source.put(ServiceReferenceTable.COLUMN_SUMMARY, data.getDataLong(4)); + source.put(ServiceReferenceTable.COLUMN_ERROR, data.getDataLong(5)); + source.put(ServiceReferenceTable.COLUMN_COST_SUMMARY, data.getDataLong(6)); + source.put(ServiceReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(7)); + + return source; } } diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml index c9e94f239ed91b72ccd630abfe4ce736fb0e3988..e8a4837c7db4271e0f377f09536aabaa67f28337 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/application.yml +++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml @@ -4,30 +4,35 @@ cluster: sessionTimeout: 100000 agent_server: jetty: - host: localhost + host: 0.0.0.0 port: 10800 context_path: / agent_stream: grpc: - host: localhost + host: 192.168.10.13 port: 11800 jetty: - host: localhost + host: 0.0.0.0 port: 12800 context_path: / ui: jetty: - host: localhost + host: 0.0.0.0 port: 12800 context_path: / collector_inside: grpc: - host: localhost + host: 127.0.0.1 port: 11800 +#storage: +# elasticsearch: +# cluster_name: CollectorDBCluster +# cluster_transport_sniffer: true +# cluster_nodes: localhost:9300 +# index_shards_number: 2 +# index_replicas_number: 0 storage: - elasticsearch: - cluster_name: CollectorDBCluster - cluster_transport_sniffer: true - cluster_nodes: localhost:9300 - index_shards_number: 2 - index_replicas_number: 0 + h2: + url: jdbc:h2:~/collector + user_name: sa + password: sa \ No newline at end of file diff --git a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java index 65960cbef831a54ad945aad94aa532cbb32da05f..0353837169c75ce4a348b3eeadb808d990fb45f8 100644 --- a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java +++ b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/h2/H2Client.java @@ -1,10 +1,9 @@ package org.skywalking.apm.collector.client.h2; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; + +import org.h2.jdbcx.JdbcConnectionPool; +import org.h2.util.IOUtils; import org.skywalking.apm.collector.core.client.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,25 +15,49 @@ public class H2Client implements Client { private final Logger logger = LoggerFactory.getLogger(H2Client.class); + private JdbcConnectionPool cp; private Connection conn; + private String url; + private String userName; + private String password; + + public H2Client() { + this.url = "jdbc:h2:mem:collector"; + this.userName = ""; + this.password = ""; + } + + public H2Client(String url, String userName, String password) { + this.url = url; + this.userName = userName; + this.password = password; + } @Override public void initialize() throws H2ClientException { try { - Class.forName("org.h2.Driver"); - conn = DriverManager.getConnection("jdbc:h2:mem:collector"); - } catch (ClassNotFoundException | SQLException e) { + cp = JdbcConnectionPool. + create(this.url, this.userName, this.password); + conn = cp.getConnection(); + } catch (Exception e) { throw new H2ClientException(e.getMessage(), e); } } @Override public void shutdown() { + if (cp != null) { + cp.dispose(); + } + IOUtils.closeSilently(conn); + } + public Connection getConnection() throws H2ClientException { + return conn; } public void execute(String sql) throws H2ClientException { Statement statement = null; try { - statement = conn.createStatement(); + statement = getConnection().createStatement(); statement.execute(sql); statement.closeOnCompletion(); } catch (SQLException e) { @@ -42,17 +65,51 @@ public class H2Client implements Client { } } - public void executeQuery(String sql) throws H2ClientException { - Statement statement = null; + public ResultSet executeQuery(String sql, Object[] params) throws H2ClientException { + logger.info("execute query with result: {}", sql); + PreparedStatement statement; + ResultSet rs; + try { + statement = getConnection().prepareStatement(sql); + if (params != null) { + for (int i = 0; i < params.length; i++) { + statement.setObject(i+1, params[i]); + } + } + rs = statement.executeQuery(); + statement.closeOnCompletion(); + } catch (SQLException e) { + throw new H2ClientException(e.getMessage(), e); + } + return rs; + } + + public boolean execute(String sql, Object[] params) throws H2ClientException { + logger.info("execute insert/update/delete: {}", sql); + PreparedStatement statement; + boolean flag; try { - statement = conn.createStatement(); - ResultSet rs = statement.executeQuery(sql); - while (rs.next()) { - logger.debug(rs.getString("ADDRESS") + "," + rs.getString("DATA")); + statement = getConnection().prepareStatement(sql); + if (params != null) { + for (int i = 0; i < params.length; i++) { + statement.setObject(i+1, params[i]); + } } + flag = statement.execute(); statement.closeOnCompletion(); } catch (SQLException e) { throw new H2ClientException(e.getMessage(), e); } + return flag; + } + + public void closeResultSet(ResultSet rs) { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } } } diff --git a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java index 3077cc958cdede1d382a9c813e205a1e44e9a038..7bd2bdab8c52e61dbfbb1b764caf4ca4d866ee61 100644 --- a/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java +++ b/apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/zookeeper/ZookeeperClient.java @@ -60,6 +60,14 @@ public class ZookeeperClient implements Client { } } + public void delete(final String path, int version) throws ZookeeperClientException { + try { + zk.delete(path, version); + } catch (KeeperException | InterruptedException e) { + throw new ZookeeperClientException(e.getMessage(), e); + } + } + public byte[] getData(String path, boolean watch, Stat stat) throws ZookeeperClientException { try { return zk.getData(path, watch, stat); diff --git a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java index c209852e0c12bda8b4d1ceabbe8da980378b013a..3238fe63e3ee01b056f63f5cd8d67a37673a840a 100644 --- a/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java +++ b/apm-collector/apm-collector-cluster/src/main/java/org/skywalking/apm/collector/cluster/zookeeper/ClusterZKDataMonitor.java @@ -97,9 +97,15 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { client.getChildren(next.getKey(), true); String serverPath = next.getKey() + "/" + value.getHostPort(); - if (client.exists(serverPath, false) == null) { + Stat stat = client.exists(serverPath, false); + if (stat != null) { + client.delete(serverPath, stat.getVersion()); + } + stat = client.exists(serverPath, false); + if (stat == null) { setData(serverPath, contextPath); } else { + client.delete(serverPath, stat.getVersion()); throw new ClusterNodeExistException("current address: " + value.getHostPort() + " has been registered, check the host and port configuration or wait a moment."); } } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2Config.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2Config.java new file mode 100644 index 0000000000000000000000000000000000000000..29fdee86d7158f4c692f3bf3b06c45fcbf87c87b --- /dev/null +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2Config.java @@ -0,0 +1,7 @@ +package org.skywalking.apm.collector.storage.h2; + +public class StorageH2Config { + public static String URL; + public static String USER_NAME; + public static String PASSWORD; +} diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ConfigParser.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ConfigParser.java index 72f7b561f863c9a53ac59ccccdf591643814ef45..f4c0f3379ff407da7876507d1010b6892d456181 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ConfigParser.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ConfigParser.java @@ -3,12 +3,25 @@ package org.skywalking.apm.collector.storage.h2; import java.util.Map; import org.skywalking.apm.collector.core.config.ConfigParseException; import org.skywalking.apm.collector.core.module.ModuleConfigParser; +import org.skywalking.apm.collector.core.util.ObjectUtils; +import org.skywalking.apm.collector.core.util.StringUtils; /** * @author pengys5 */ public class StorageH2ConfigParser implements ModuleConfigParser { - + private static final String URL = "url"; + public static final String USER_NAME = "user_name"; + public static final String PASSWORD = "password"; @Override public void parse(Map config) throws ConfigParseException { + if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(URL))) { + StorageH2Config.URL = (String)config.get(URL); + } + if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(USER_NAME))) { + StorageH2Config.USER_NAME = (String)config.get(USER_NAME); + } + if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(PASSWORD))) { + StorageH2Config.PASSWORD = (String)config.get(PASSWORD); + } } } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java index 99575e2b2702a12b318f05b49fe2c92c6a8b9f64..a75a1c1fa8268a74e59f3e0e5684074ba3891b5e 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java @@ -33,7 +33,7 @@ public class StorageH2ModuleDefine extends StorageModuleDefine { } @Override protected Client createClient() { - return new H2Client(); + return new H2Client(StorageH2Config.URL, StorageH2Config.USER_NAME, StorageH2Config.PASSWORD); } @Override public StorageInstaller storageInstaller() { diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java index a9c9ddb5adb36b21927cbebc323791384991b1ea..1d703e9544fe7c9bf4068568f3b448cb23171a85 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/dao/H2DAO.java @@ -1,10 +1,47 @@ package org.skywalking.apm.collector.storage.h2.dao; import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; import org.skywalking.apm.collector.storage.dao.DAO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; /** * @author pengys5 */ public abstract class H2DAO extends DAO { + private final Logger logger = LoggerFactory.getLogger(H2DAO.class); + public final int getMaxId(String tableName, String columnName) { + String sql = "select max(" + columnName + ") from " + tableName; + return getIntValueBySQL(sql); + } + + public final int getMinId(String tableName, String columnName) { + String sql = "select min(" + columnName + ") from " + tableName; + return getIntValueBySQL(sql); + } + + public final int getIntValueBySQL(String sql) { + H2Client client = getClient(); + ResultSet rs = null; + try { + rs = client.executeQuery(sql, null); + if (rs.next()) { + int id = rs.getInt(1); + if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) { + return 0; + } else { + return id; + } + } + } catch (SQLException | H2ClientException e) { + logger.error(e.getMessage(), e); + } finally { + client.closeResultSet(rs); + } + return 0; + } } diff --git a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2StorageInstaller.java b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2StorageInstaller.java index 3244808a5422b50f347dbbe215915d430ce52f68..12718a66c6c2106cb8364f65074a77c19d4879ed 100644 --- a/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2StorageInstaller.java +++ b/apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2StorageInstaller.java @@ -1,6 +1,10 @@ package org.skywalking.apm.collector.storage.h2.define; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; + +import org.h2.util.IOUtils; import org.skywalking.apm.collector.client.h2.H2Client; import org.skywalking.apm.collector.client.h2.H2ClientException; import org.skywalking.apm.collector.core.client.Client; @@ -8,12 +12,16 @@ import org.skywalking.apm.collector.core.storage.StorageException; import org.skywalking.apm.collector.core.storage.StorageInstallException; import org.skywalking.apm.collector.core.storage.StorageInstaller; import org.skywalking.apm.collector.core.storage.TableDefine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author pengys5 */ public class H2StorageInstaller extends StorageInstaller { + private final Logger logger = LoggerFactory.getLogger(H2StorageInstaller.class); + @Override protected void defineFilter(List tableDefines) { int size = tableDefines.size(); for (int i = size - 1; i >= 0; i--) { @@ -24,11 +32,36 @@ public class H2StorageInstaller extends StorageInstaller { } @Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException { + H2Client h2Client = (H2Client)client; + ResultSet rs = null; + try { + logger.info("check if table {} exist ", tableDefine.getName()); + rs = h2Client.getConnection().getMetaData().getTables(null, null, tableDefine.getName().toUpperCase(), null); + if (rs.next()) { + return true; + } + } catch (SQLException | H2ClientException e) { + throw new StorageInstallException(e.getMessage(), e); + } finally { + try { + if (rs != null) { + rs.close(); + } + } catch (SQLException e) { + throw new StorageInstallException(e.getMessage(), e); + } + } return false; } @Override protected boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException { - return false; + H2Client h2Client = (H2Client)client; + try { + h2Client.execute("drop table if exists " + tableDefine.getName()); + return true; + } catch (H2ClientException e) { + throw new StorageInstallException(e.getMessage(), e); + } } @Override protected boolean createTable(Client client, TableDefine tableDefine) throws StorageException { @@ -41,14 +74,16 @@ public class H2StorageInstaller extends StorageInstaller { h2TableDefine.getColumnDefines().forEach(columnDefine -> { H2ColumnDefine h2ColumnDefine = (H2ColumnDefine)columnDefine; if (h2ColumnDefine.getType().equals(H2ColumnDefine.Type.Varchar.name())) { - sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append("(255)"); + sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append("(255),"); } else { - sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()); + sqlBuilder.append(h2ColumnDefine.getName()).append(" ").append(h2ColumnDefine.getType()).append(","); } }); - + //remove last comma + sqlBuilder.delete(sqlBuilder.length() - 1, sqlBuilder.length()); sqlBuilder.append(")"); try { + logger.info("create h2 table with sql {}", sqlBuilder); h2Client.execute(sqlBuilder.toString()); } catch (H2ClientException e) { throw new StorageInstallException(e.getMessage(), e);