提交 1b8db8ce 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge branch 'master' into wu-sheng-3.2-readme

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-application-toolkit</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-toolkit-trace</artifactId>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
<build>
<plugins>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<version>2.4</version>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>bintray-wu-sheng-sky-walking-repository</id>
<name>wu-sheng-sky-walking-repository</name>
<url>
https://api.bintray.com/maven/wu-sheng/skywalking/org.skywalking.apm-toolkit-trace/;publish=1
</url>
</repository>
</distributionManagement>
</project>
package org.skywalking.apm.toolkit.trace;
/**
* provide custom api that set tag for current active span.
*
* @author zhangxin
*/
public class ActiveSpan {
/**
* @param key tag key
* @param value tag value
*/
public static void tag(String key, String value) {
}
}
package org.skywalking.apm.toolkit.trace;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* The agent create local span if the method that annotation with {@link Trace}. The value of span operation name will
* fetch by {@link #operationName()}. if the value of {@link #operationName()} is blank string. the operation name will
* be set the class name + method name.
*
* @author zhangxin
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Trace {
/**
* @return operation name, the default value is blank string.
*/
String operationName() default "";
}
......@@ -19,5 +19,6 @@
<module>apm-toolkit-logback-1.x</module>
<module>apm-toolkit-trace-context</module>
<module>apm-toolkit-opentracing</module>
<module>apm-toolkit-trace</module>
</modules>
</project>
......@@ -3,23 +3,23 @@ package org.skywalking.apm.collector.agentjvm.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
......@@ -44,7 +44,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, time);
senToInstanceHeartBeatPersistenceWorker(context, applicationInstanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(context, applicationInstanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, applicationInstanceId, time, metric.getMemoryPoolList());
......@@ -59,8 +59,8 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
long heartBeatTime) {
InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
heartBeat.setId(String.valueOf(applicationInstanceId));
heartBeat.setHeartbeatTime(heartBeatTime);
heartBeat.setApplicationInstanceId(applicationInstanceId);
heartBeat.setHeartBeatTime(heartBeatTime);
heartBeat.setInstanceId(applicationInstanceId);
try {
logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData());
......@@ -111,7 +111,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
memoryPools.forEach(memoryPool -> {
MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric();
memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId + Const.ID_SPLIT + memoryPool.getIsHeap() + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
memoryPoolMetric.setApplicationInstanceId(applicationInstanceId);
memoryPoolMetric.setPoolType(memoryPool.getType().getNumber());
memoryPoolMetric.setHeap(memoryPool.getIsHeap());
......@@ -139,6 +139,7 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
gcMetric.setCount(gc.getCount());
gcMetric.setTime(gc.getTime());
gcMetric.setTimeBucket(timeBucket);
gcMetric.setS5TimeBucket(TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(timeBucket));
try {
logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId());
context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData());
......
package org.skywalking.apm.collector.agentjvm.worker.cpu;
import org.skywalking.apm.collector.agentjvm.worker.cpu.dao.ICpuMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -4,17 +4,21 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.cpu.define.CpuMetricTable;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(CpuMetricEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
}
......@@ -25,6 +29,7 @@ public class CpuMetricEsDAO extends EsDAO implements ICpuMetricDAO, IPersistence
source.put(CpuMetricTable.COLUMN_USAGE_PERCENT, data.getDataDouble(0));
source.put(CpuMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
logger.debug("prepare cpu metric batch insert, id: {}", data.getDataString(0));
return getClient().prepareIndex(CpuMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentjvm.worker.cpu.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
/**
* @author pengys5
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentjvm.worker.cpu.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentjvm.worker.gc;
import org.skywalking.apm.collector.agentjvm.worker.gc.dao.IGCMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -4,11 +4,11 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.gc.define.GCMetricTable;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
......@@ -22,10 +22,11 @@ public class GCMetricEsDAO extends EsDAO implements IGCMetricDAO, IPersistenceDA
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(GCMetricTable.COLUMN_APPLICATION_INSTANCE_ID, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(0));
source.put(GCMetricTable.COLUMN_PHRASE, data.getDataInteger(1));
source.put(GCMetricTable.COLUMN_COUNT, data.getDataLong(0));
source.put(GCMetricTable.COLUMN_TIME, data.getDataLong(1));
source.put(GCMetricTable.COLUMN_TIME_BUCKET, data.getDataLong(2));
source.put(GCMetricTable.COLUMN_5S_TIME_BUCKET, data.getDataLong(3));
return getClient().prepareIndex(GCMetricTable.TABLE, data.getDataString(0)).setSource(source);
}
......
package org.skywalking.apm.collector.agentjvm.worker.gc.define;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
......@@ -30,5 +31,6 @@ public class GCMetricEsTableDefine extends ElasticSearchTableDefine {
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_COUNT, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(GCMetricTable.COLUMN_5S_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentjvm.worker.gc.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
/**
* @author pengys5
......@@ -19,5 +20,6 @@ public class GCMetricH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_COUNT, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(GCMetricTable.COLUMN_5S_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentjvm.worker.heartbeat;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.IInstanceHeartBeatDAO;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -27,11 +27,11 @@ public class InstHeartBeatPersistenceWorker extends PersistenceWorker {
}
@Override protected boolean needMergeDBData() {
return false;
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(InstanceHeartBeatEsDAO.class.getName());
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IInstanceHeartBeatDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstHeartBeatPersistenceWorker> {
......
......@@ -2,30 +2,47 @@ package org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceHeartBeatEsDAO extends EsDAO implements IInstanceHeartBeatDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
return null;
GetResponse getResponse = getClient().prepareGet(InstanceTable.TABLE, id).get();
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(InstanceTable.COLUMN_INSTANCE_ID));
data.setDataLong(0, (Long)source.get(InstanceTable.COLUMN_HEARTBEAT_TIME));
logger.debug("id: {} is exists", id);
return data;
} else {
logger.debug("id: {} is not exists", id);
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
return null;
throw new UnexpectedException("There is no need to merge stream data with database data.");
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(InstanceTable.COLUMN_REGISTER_TIME, data.getDataLong(0));
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, data.getDataLong(0));
return getClient().prepareUpdate(InstanceTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentjvm.worker.heartbeat.define;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.Transform;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
/**
* @author pengys5
......@@ -22,27 +21,35 @@ public class InstanceHeartBeatDataDefine extends DataDefine {
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(InstanceTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
throw new UnexpectedException("instance heart beat data did not need send to remote worker.");
String id = remoteData.getDataStrings(0);
int instanceId = remoteData.getDataIntegers(0);
long heartBeatTime = remoteData.getDataLongs(0);
return new InstanceHeartBeat(id, heartBeatTime, instanceId);
}
@Override public RemoteData serialize(Object object) {
throw new UnexpectedException("instance heart beat data did not need send to remote worker.");
InstanceHeartBeat instanceHeartBeat = (InstanceHeartBeat)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(instanceHeartBeat.getId());
builder.addDataIntegers(instanceHeartBeat.getInstanceId());
builder.addDataLongs(instanceHeartBeat.getHeartBeatTime());
return builder.build();
}
public static class InstanceHeartBeat implements Transform<InstanceHeartBeat> {
private String id;
private int applicationInstanceId;
private long heartbeatTime;
private long heartBeatTime;
private int instanceId;
public InstanceHeartBeat(String id, int applicationInstanceId, long heartbeatTime) {
public InstanceHeartBeat(String id, long heartBeatTime, int instanceId) {
this.id = id;
this.applicationInstanceId = applicationInstanceId;
this.heartbeatTime = heartbeatTime;
this.heartBeatTime = heartBeatTime;
this.instanceId = instanceId;
}
public InstanceHeartBeat() {
......@@ -52,40 +59,40 @@ public class InstanceHeartBeatDataDefine extends DataDefine {
InstanceHeartBeatDataDefine define = new InstanceHeartBeatDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataInteger(0, this.applicationInstanceId);
data.setDataLong(0, this.heartbeatTime);
data.setDataInteger(0, this.instanceId);
data.setDataLong(0, this.heartBeatTime);
return data;
}
@Override public InstanceHeartBeat toSelf(Data data) {
this.id = data.getDataString(0);
this.applicationInstanceId = data.getDataInteger(0);
this.heartbeatTime = data.getDataLong(0);
this.instanceId = data.getDataInteger(0);
this.heartBeatTime = data.getDataLong(0);
return this;
}
public void setId(String id) {
this.id = id;
public String getId() {
return id;
}
public void setApplicationInstanceId(int applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
public long getHeartBeatTime() {
return heartBeatTime;
}
public int getApplicationInstanceId() {
return applicationInstanceId;
public void setHeartBeatTime(long heartBeatTime) {
this.heartBeatTime = heartBeatTime;
}
public long getHeartbeatTime() {
return heartbeatTime;
public int getInstanceId() {
return instanceId;
}
public void setHeartbeatTime(long heartbeatTime) {
this.heartbeatTime = heartbeatTime;
public void setInstanceId(int instanceId) {
this.instanceId = instanceId;
}
}
}
package org.skywalking.apm.collector.agentjvm.worker.memory;
import org.skywalking.apm.collector.agentjvm.worker.memory.dao.IMemoryMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -4,11 +4,11 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.memory.define.MemoryMetricTable;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentjvm.worker.memory.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricTable;
/**
* @author pengys5
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentjvm.worker.memory.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentjvm.worker.memorypool;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.IMemoryPoolMetricDAO;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -4,11 +4,11 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.define.MemoryPoolMetricTable;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentjvm.worker.memorypool.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricTable;
/**
* @author pengys5
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentjvm.worker.memorypool.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricTable;
/**
* @author pengys5
......
org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricEsDAO
org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.worker.cpu.dao.CpuMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memory.dao.MemoryMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.memorypool.dao.MemoryPoolMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.worker.gc.dao.GCMetricH2DAO
org.skywalking.apm.collector.agentjvm.worker.heartbeat.dao.InstanceHeartBeatH2DAO
\ No newline at end of file
......@@ -2,6 +2,8 @@ package org.skywalking.apm.collector.agentjvm.grpc.handler;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.GCPhrase;
......@@ -21,14 +23,24 @@ public class JVMMetricsServiceHandlerTestCase {
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandlerTestCase.class);
private JVMMetricsServiceGrpc.JVMMetricsServiceBlockingStub stub;
private static JVMMetricsServiceGrpc.JVMMetricsServiceBlockingStub stub;
public void test() {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = JVMMetricsServiceGrpc.newBlockingStub(channel);
final long timeInterval = 1;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> multiInstanceJvmSend(), 1, timeInterval, TimeUnit.SECONDS);
}
public static void multiInstanceJvmSend() {
buildJvmMetric(2);
buildJvmMetric(3);
}
private static void buildJvmMetric(int instanceId) {
JVMMetrics.Builder jvmMetricsBuilder = JVMMetrics.newBuilder();
jvmMetricsBuilder.setApplicationInstanceId(1);
jvmMetricsBuilder.setApplicationInstanceId(instanceId);
JVMMetric.Builder jvmMetric = JVMMetric.newBuilder();
jvmMetric.setTime(System.currentTimeMillis());
......@@ -41,13 +53,13 @@ public class JVMMetricsServiceHandlerTestCase {
stub.collect(jvmMetricsBuilder.build());
}
private void buildCpuMetric(JVMMetric.Builder jvmMetric) {
private static void buildCpuMetric(JVMMetric.Builder jvmMetric) {
CPU.Builder cpuBuilder = CPU.newBuilder();
cpuBuilder.setUsagePercent(70);
jvmMetric.setCpu(cpuBuilder);
}
private void buildMemoryMetric(JVMMetric.Builder jvmMetric) {
private static void buildMemoryMetric(JVMMetric.Builder jvmMetric) {
Memory.Builder builder_1 = Memory.newBuilder();
builder_1.setIsHeap(true);
builder_1.setInit(20);
......@@ -65,7 +77,7 @@ public class JVMMetricsServiceHandlerTestCase {
jvmMetric.addMemory(builder_2.build());
}
private void buildMemoryPoolMetric(JVMMetric.Builder jvmMetric) {
private static void buildMemoryPoolMetric(JVMMetric.Builder jvmMetric) {
MemoryPool.Builder builder_1 = MemoryPool.newBuilder();
builder_1.setType(PoolType.NEWGEN_USAGE);
builder_1.setIsHeap(true);
......@@ -76,11 +88,17 @@ public class JVMMetricsServiceHandlerTestCase {
jvmMetric.addMemoryPool(builder_1.build());
}
private void buildGcMetric(JVMMetric.Builder jvmMetric) {
GC.Builder gcBuilder = GC.newBuilder();
gcBuilder.setPhrase(GCPhrase.NEW);
gcBuilder.setCount(2);
gcBuilder.setTime(100);
jvmMetric.addGc(gcBuilder.build());
private static void buildGcMetric(JVMMetric.Builder jvmMetric) {
GC.Builder newGcBuilder = GC.newBuilder();
newGcBuilder.setPhrase(GCPhrase.NEW);
newGcBuilder.setCount(2);
newGcBuilder.setTime(100);
jvmMetric.addGc(newGcBuilder.build());
GC.Builder oldGcBuilder = GC.newBuilder();
oldGcBuilder.setPhrase(GCPhrase.OLD);
oldGcBuilder.setCount(2);
oldGcBuilder.setTime(100);
jvmMetric.addGc(oldGcBuilder.build());
}
}
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.agentstream.worker.cache.ApplicationCache;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......
package org.skywalking.apm.collector.agentregister.grpc.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.instance.InstanceIDService;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
......@@ -9,6 +11,7 @@ import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.OSInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -23,7 +26,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime());
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
......@@ -39,8 +42,22 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), request.getRegisterTime());
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private String buildOsInfo(OSInfo osinfo) {
JsonObject osInfoJson = new JsonObject();
osInfoJson.addProperty("osName", osinfo.getOsName());
osInfoJson.addProperty("hostName", osinfo.getHostname());
osInfoJson.addProperty("processId", osinfo.getProcessNo());
JsonArray ipv4Array = new JsonArray();
osinfo.getIpv4SList().forEach(ipv4 -> {
ipv4Array.add(ipv4);
});
osInfoJson.add("ipv4s", ipv4Array);
return osInfoJson.toString();
}
}
package org.skywalking.apm.collector.agentregister.instance;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
......@@ -19,14 +19,14 @@ public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
public int getOrCreate(int applicationId, String agentUUID, long registerTime) {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}", applicationId, agentUUID, registerTime);
public int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo) {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int instanceId = dao.getInstanceId(applicationId, agentUUID);
if (instanceId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance("0", applicationId, agentUUID, registerTime, 0);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance("0", applicationId, agentUUID, registerTime, 0, registerTime, osInfo);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(instance);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
......@@ -42,11 +42,11 @@ public class InstanceIDService {
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}
public void recover(int instanceId, int applicationId, long registerTime) {
public void recover(int instanceId, int applicationId, long registerTime, String osInfo) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId, registerTime, osInfo);
dao.save(instance);
}
}
......@@ -25,6 +25,7 @@ public class InstanceDiscoveryServletHandler extends JettyHandler {
private static final String AGENT_UUID = "au";
private static final String REGISTER_TIME = "rt";
private static final String INSTANCE_ID = "ii";
private static final String OS_INFO = "oi";
@Override public String pathSpec() {
return "/instance/register";
......@@ -41,8 +42,9 @@ public class InstanceDiscoveryServletHandler extends JettyHandler {
int applicationId = instance.get(APPLICATION_ID).getAsInt();
String agentUUID = instance.get(AGENT_UUID).getAsString();
long registerTime = instance.get(REGISTER_TIME).getAsLong();
JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject();
int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime);
int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime, osInfo.toString());
responseJson.addProperty(APPLICATION_ID, applicationId);
responseJson.addProperty(INSTANCE_ID, instanceId);
} catch (IOException e) {
......
package org.skywalking.apm.collector.agentregister.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......
package org.skywalking.apm.collector.agentstream.worker.global;
import org.skywalking.apm.collector.agentstream.worker.global.dao.IGlobalTraceDAO;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.storage.define.global.GlobalTraceDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -2,10 +2,10 @@ package org.skywalking.apm.collector.agentstream.worker.global;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.global.define.GlobalTraceDataDefine;
import org.skywalking.apm.collector.storage.define.global.GlobalTraceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.GlobalTraceIdsListener;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
......@@ -6,10 +6,10 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.global.GlobalTraceTable;
import org.skywalking.apm.collector.storage.define.global.GlobalTraceTable;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.global.GlobalTraceTable;
import org.skywalking.apm.collector.storage.define.global.GlobalTraceTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.global.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.global.GlobalTraceTable;
import org.skywalking.apm.collector.storage.define.global.GlobalTraceTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.instance.performance;
import org.skywalking.apm.collector.agentstream.worker.instance.performance.dao.IInstPerformanceDAO;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class InstPerformancePersistenceWorker extends PersistenceWorker {
public InstPerformancePersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected boolean needMergeDBData() {
return true;
}
@Override protected IPersistenceDAO persistenceDAO() {
return (IPersistenceDAO)DAOContainer.INSTANCE.get(IInstPerformanceDAO.class.getName());
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstPerformancePersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public InstPerformancePersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new InstPerformancePersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return InstPerformancePersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new InstPerformanceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.instance.performance;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstPerformanceSpanListener implements EntrySpanListener, FirstSpanListener {
private final Logger logger = LoggerFactory.getLogger(InstPerformanceSpanListener.class);
private int applicationId;
private int applicationInstanceId;
private long cost;
private long timeBucket;
@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
}
@Override
public void parseFirst(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
this.applicationId = applicationId;
this.applicationInstanceId = applicationInstanceId;
this.cost = spanObject.getEndTime() - spanObject.getStartTime();
timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(spanObject.getStartTime());
}
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
InstPerformanceDataDefine.InstPerformance instPerformance = new InstPerformanceDataDefine.InstPerformance();
instPerformance.setId(timeBucket + Const.ID_SPLIT + applicationInstanceId);
instPerformance.setApplicationId(applicationId);
instPerformance.setInstanceId(applicationInstanceId);
instPerformance.setCallTimes(1);
instPerformance.setCostTotal(cost);
instPerformance.setTimeBucket(timeBucket);
instPerformance.setS5TimeBucket(TimeBucketUtils.INSTANCE.getFiveSecondTimeBucket(timeBucket));
try {
logger.debug("send to instance performance persistence worker, id: {}", instPerformance.getId());
context.getClusterWorkerContext().lookup(InstPerformancePersistenceWorker.WorkerRole.INSTANCE).tell(instPerformance.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.agentstream.worker.instance.performance.dao;
/**
* @author pengys5
*/
public interface IInstPerformanceDAO {
}
package org.skywalking.apm.collector.agentstream.worker.instance.performance.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstPerformanceEsDAO extends EsDAO implements IInstPerformanceDAO, IPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder> {
private final Logger logger = LoggerFactory.getLogger(InstPerformanceEsDAO.class);
@Override public Data get(String id, DataDefine dataDefine) {
GetResponse getResponse = getClient().prepareGet(InstPerformanceTable.TABLE, id).get();
if (getResponse.isExists()) {
logger.debug("id: {} is exist", id);
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataInteger(0, (Integer)source.get(InstPerformanceTable.COLUMN_APPLICATION_ID));
data.setDataInteger(1, (Integer)source.get(InstPerformanceTable.COLUMN_INSTANCE_ID));
data.setDataInteger(2, (Integer)source.get(InstPerformanceTable.COLUMN_CALL_TIMES));
data.setDataLong(0, ((Number)source.get(InstPerformanceTable.COLUMN_COST_TOTAL)).longValue());
data.setDataLong(1, ((Number)source.get(InstPerformanceTable.COLUMN_TIME_BUCKET)).longValue());
data.setDataLong(2, ((Number)source.get(InstPerformanceTable.COLUMN_5S_TIME_BUCKET)).longValue());
return data;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(InstPerformanceTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(InstPerformanceTable.COLUMN_INSTANCE_ID, data.getDataInteger(1));
source.put(InstPerformanceTable.COLUMN_CALL_TIMES, data.getDataInteger(2));
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0));
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1));
source.put(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, data.getDataLong(2));
return getClient().prepareIndex(InstPerformanceTable.TABLE, data.getDataString(0)).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(InstPerformanceTable.COLUMN_APPLICATION_ID, data.getDataInteger(0));
source.put(InstPerformanceTable.COLUMN_INSTANCE_ID, data.getDataInteger(1));
source.put(InstPerformanceTable.COLUMN_CALL_TIMES, data.getDataInteger(2));
source.put(InstPerformanceTable.COLUMN_COST_TOTAL, data.getDataLong(0));
source.put(InstPerformanceTable.COLUMN_TIME_BUCKET, data.getDataLong(1));
source.put(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, data.getDataLong(2));
return getClient().prepareUpdate(InstPerformanceTable.TABLE, data.getDataString(0)).setDoc(source);
}
}
package org.skywalking.apm.collector.agentstream.worker.instance.performance.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class InstPerformanceH2DAO extends H2DAO implements IInstPerformanceDAO {
}
package org.skywalking.apm.collector.agentstream.worker.instance.performance.define;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class InstPerformanceEsTableDefine extends ElasticSearchTableDefine {
public InstPerformanceEsTableDefine() {
super(InstPerformanceTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.instance.performance.define;
import org.skywalking.apm.collector.storage.define.instance.InstPerformanceTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class InstPerformanceH2TableDefine extends H2TableDefine {
public InstPerformanceH2TableDefine() {
super(InstPerformanceTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_CALL_TIMES, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_COST_TOTAL, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstPerformanceTable.COLUMN_5S_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.node.component.dao.INodeComponentDAO;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -3,13 +3,13 @@ package org.skywalking.apm.collector.agentstream.worker.node.component;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.node.component.define.NodeComponentDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
......@@ -5,11 +5,11 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.node.component.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.INodeMappingDAO;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -3,11 +3,11 @@ package org.skywalking.apm.collector.agentstream.worker.node.mapping;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.define.NodeMappingDataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeMappingDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
......@@ -5,11 +5,11 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
import org.skywalking.apm.collector.storage.define.node.NodeMappingTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.node.mapping.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
import org.skywalking.apm.collector.storage.define.node.NodeMappingTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.node.mapping.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.node.NodeMappingTable;
import org.skywalking.apm.collector.storage.define.node.NodeMappingTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -4,13 +4,13 @@ import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.define.NodeRefDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
......@@ -5,11 +5,11 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.table.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......
......@@ -2,8 +2,8 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.reference.define
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.reference.define
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.WorkerRefs;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.INodeRefSumDAO;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -4,13 +4,13 @@ import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.cache.InstanceCache;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.define.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.RefsListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
......@@ -5,12 +5,12 @@ import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.table.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.table.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.summary.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.noderef.summary.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.noderef.NodeRefSumTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
/**
* @author pengys5
......@@ -14,7 +14,7 @@ public class ApplicationEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 0;
return 2;
}
@Override public int numberOfShards() {
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
......
......@@ -4,13 +4,14 @@ import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
......
......@@ -9,8 +9,8 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.table.register.ApplicationTable;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
......
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
......
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
/**
* @author pengys5
......@@ -14,7 +14,7 @@ public class InstanceEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 0;
return 2;
}
@Override public int numberOfShards() {
......@@ -31,5 +31,6 @@ public class InstanceEsTableDefine extends ElasticSearchTableDefine {
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_OS_INFO, ElasticSearchColumnDefine.Type.Keyword.name()));
}
}
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
/**
* @author pengys5
......@@ -19,5 +19,6 @@ public class InstanceH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_OS_INFO, H2ColumnDefine.Type.Varchar.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
......
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.storage.define.register.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
......
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
/**
* @author pengys5
......
......@@ -12,10 +12,10 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -62,6 +62,8 @@ public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
source.put(InstanceTable.COLUMN_APPLICATION_ID, instance.getApplicationId());
source.put(InstanceTable.COLUMN_AGENT_UUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_REGISTER_TIME, instance.getRegisterTime());
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, instance.getHeartBeatTime());
source.put(InstanceTable.COLUMN_OS_INFO, instance.getOsInfo());
IndexResponse response = client.prepareIndex(InstanceTable.TABLE, instance.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save instance register info, application id: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name());
......
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
/**
* @author pengys5
......@@ -14,7 +14,7 @@ public class ServiceNameEsTableDefine extends ElasticSearchTableDefine {
}
@Override public int refreshInterval() {
return 0;
return 2;
}
@Override public int numberOfShards() {
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
/**
* @author pengys5
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
......
......@@ -3,13 +3,14 @@ package org.skywalking.apm.collector.agentstream.worker.register.servicename;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
/**
* @author pengys5
......
......@@ -10,8 +10,8 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
......
package org.skywalking.apm.collector.agentstream.worker.register.servicename.dao;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.define.register.ServiceNameDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
......
......@@ -3,13 +3,14 @@ package org.skywalking.apm.collector.agentstream.worker.segment;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.global.GlobalTraceSpanListener;
import org.skywalking.apm.collector.agentstream.worker.instance.performance.InstPerformanceSpanListener;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentSpanListener;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.summary.NodeRefSumSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agentstream.worker.segment.origin.define.SegmentDataDefine;
import org.skywalking.apm.collector.storage.define.segment.SegmentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.service.entry.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.serviceref.reference.ServiceRefSpanListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
......@@ -45,6 +46,7 @@ public class SegmentParse {
spanListeners.add(new GlobalTraceSpanListener());
spanListeners.add(new ServiceEntrySpanListener());
spanListeners.add(new ServiceRefSpanListener());
spanListeners.add(new InstPerformanceSpanListener());
}
public void parse(List<UniqueId> traceIds, TraceSegmentObject segmentObject) {
......
package org.skywalking.apm.collector.agentstream.worker.segment.cost;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.ISegmentCostDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -9,7 +9,7 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......
......@@ -6,9 +6,9 @@ import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.cost.define.SegmentCostDataDefine;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostDataDefine;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.stream.worker.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
......
......@@ -4,11 +4,11 @@ import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.storage.table.segment.SegmentCostTable;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.segment.SegmentCostTable;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostTable;
/**
* @author pengys5
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentstream.worker.segment.cost.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.segment.SegmentCostTable;
import org.skywalking.apm.collector.storage.define.segment.SegmentCostTable;
/**
* @author pengys5
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册