提交 e216a369 编写于 作者: P pengys5

Segment save to es success

上级 33fcede9
......@@ -21,7 +21,7 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO {
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource();
IndexRequestBuilder builder = getClient().prepareIndex(NodeComponentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
......
......@@ -21,7 +21,7 @@ public class NodeMappingEsDAO extends EsDAO implements INodeMappingDAO {
source.put(NodeMappingTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeMappingTable.TABLE, id).setSource();
IndexRequestBuilder builder = getClient().prepareIndex(NodeMappingTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
......
......@@ -21,7 +21,7 @@ public class NodeReferenceEsDAO extends EsDAO implements INodeReferenceDAO {
source.put(NodeRefTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeRefTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
IndexRequestBuilder builder = getClient().prepareIndex(NodeRefTable.TABLE, id).setSource();
IndexRequestBuilder builder = getClient().prepareIndex(NodeRefTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
......
......@@ -5,7 +5,13 @@ import java.util.List;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentSpanListener;
import org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingSpanListener;
import org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.SpanObject;
import org.skywalking.apm.network.proto.SpanType;
import org.skywalking.apm.network.proto.TraceSegmentObject;
......@@ -68,6 +74,26 @@ public class SegmentParse {
}
notifyListenerToBuild();
StringBuilder segmentId = new StringBuilder();
segmentObject.getTraceSegmentId().getIdPartsList().forEach(part -> {
segmentId.append(part);
});
buildSegment(segmentId.toString(), segmentObject.toByteArray());
}
public void buildSegment(String id, byte[] dataBinary) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
SegmentDataDefine.Segment segment = new SegmentDataDefine.Segment();
segment.setId(id);
segment.setDataBinary(dataBinary);
try {
logger.debug("send to segment persistence worker, id: {}, dataBinary length: {}", segment.getId(), dataBinary.length);
context.getClusterWorkerContext().lookup(SegmentPersistenceWorker.WorkerRole.INSTANCE).tell(segment.transform());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
private void notifyListenerToBuild() {
......
package org.skywalking.apm.collector.agentstream.worker.segment;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.segment.dao.ISegmentDAO;
import org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentDataDefine;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class SegmentPersistenceWorker extends PersistenceWorker {
public SegmentPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected List<?> prepareBatch(Map<String, Data> dataMap) {
ISegmentDAO dao = (ISegmentDAO)DAOContainer.INSTANCE.get(ISegmentDAO.class.getName());
return dao.prepareBatch(dataMap);
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<SegmentPersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public SegmentPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new SegmentPersistenceWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return SegmentPersistenceWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new SegmentDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
/**
* @author pengys5
*/
public interface ISegmentDAO {
List<?> prepareBatch(Map<String, Data> dataMap);
}
package org.skywalking.apm.collector.agentstream.worker.segment.dao;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SegmentEsDAO extends EsDAO implements ISegmentDAO {
private final Logger logger = LoggerFactory.getLogger(SegmentEsDAO.class);
@Override public List<?> prepareBatch(Map<String, Data> dataMap) {
List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
dataMap.forEach((id, data) -> {
logger.debug("segment prepareBatch, id: {}", id);
Map<String, Object> source = new HashMap();
source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBytes(0))));
logger.debug("segment source: {}", source.toString());
IndexRequestBuilder builder = getClient().prepareIndex(SegmentTable.TABLE, id).setSource(source);
indexRequestBuilders.add(builder);
});
return indexRequestBuilders;
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.dao;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class SegmentH2DAO extends H2DAO implements ISegmentDAO {
@Override public List<?> prepareBatch(Map map) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.define;
import com.google.protobuf.ByteString;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.TransformToData;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class SegmentDataDefine extends DataDefine {
public static final int DEFINE_ID = 401;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 2;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute(SegmentTable.COLUMN_ID, AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(SegmentTable.COLUMN_DATA_BINARY, AttributeType.BYTE, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
byte[] dataBinary = remoteData.getDataBytes(0).toByteArray();
return new Segment(id, dataBinary);
}
@Override public RemoteData serialize(Object object) {
Segment segment = (Segment)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(segment.getId());
builder.addDataBytes(ByteString.copyFrom(segment.getDataBinary()));
return builder.build();
}
public static class Segment implements TransformToData {
private String id;
private byte[] dataBinary;
public Segment(String id, byte[] dataBinary) {
this.id = id;
this.dataBinary = dataBinary;
}
public Segment() {
}
@Override public Data transform() {
SegmentDataDefine define = new SegmentDataDefine();
Data data = define.build(id);
data.setDataString(0, this.id);
data.setDataBytes(0, this.dataBinary);
return data;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public byte[] getDataBinary() {
return dataBinary;
}
public void setDataBinary(byte[] dataBinary) {
this.dataBinary = dataBinary;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.define;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class SegmentEsTableDefine extends ElasticSearchTableDefine {
public SegmentEsTableDefine() {
super(SegmentTable.TABLE);
}
@Override public int refreshInterval() {
return 10;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(SegmentTable.COLUMN_DATA_BINARY, ElasticSearchColumnDefine.Type.Binary.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.define;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class SegmentH2TableDefine extends H2TableDefine {
public SegmentH2TableDefine() {
super(SegmentTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(SegmentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(SegmentTable.COLUMN_DATA_BINARY, H2ColumnDefine.Type.BINARY.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.segment.define;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
/**
* @author pengys5
*/
public class SegmentTable extends CommonTable {
public static final String TABLE = "segment";
public static final String COLUMN_DATA_BINARY = "data_binary";
}
......@@ -3,4 +3,5 @@ org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEs
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameEsDAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentEsDAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingEsDAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceEsDAO
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentEsDAO
\ No newline at end of file
......@@ -3,4 +3,5 @@ org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2
org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.ServiceNameH2DAO
org.skywalking.apm.collector.agentstream.worker.node.component.dao.NodeComponentH2DAO
org.skywalking.apm.collector.agentstream.worker.node.mapping.dao.NodeMappingH2DAO
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.NodeReferenceH2DAO
org.skywalking.apm.collector.agentstream.worker.segment.dao.SegmentH2DAO
\ No newline at end of file
......@@ -7,6 +7,8 @@ org.skywalking.apm.collector.agentstream.worker.node.mapping.NodeMappingPersiste
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefAggregationWorker$Factory
org.skywalking.apm.collector.agentstream.worker.noderef.reference.NodeRefPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.segment.SegmentPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterSerialWorker$Factory
\ No newline at end of file
......@@ -14,4 +14,7 @@ org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceEsTabl
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameH2TableDefine
org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.segment.define.SegmentH2TableDefine
\ No newline at end of file
......@@ -22,6 +22,7 @@ public abstract class StorageInstaller {
for (TableDefine tableDefine : tableDefines) {
if (!isExists(client, tableDefine)) {
logger.info("table: {} not exists", tableDefine.getName());
tableDefine.initialize();
createTable(client, tableDefine);
}
}
......
......@@ -70,13 +70,14 @@ public class ElasticSearchStorageInstaller extends StorageInstaller {
ElasticSearchColumnDefine elasticSearchColumnDefine = (ElasticSearchColumnDefine)columnDefine;
mappingBuilder
.startObject(elasticSearchColumnDefine.getName())
.field("type", elasticSearchColumnDefine.getType())
.field("type", elasticSearchColumnDefine.getType().toLowerCase())
.endObject();
}
mappingBuilder
.endObject()
.endObject();
logger.debug("create elasticsearch index: {}", mappingBuilder.string());
return mappingBuilder;
}
......
......@@ -12,6 +12,6 @@ public class H2ColumnDefine extends ColumnDefine {
}
public enum Type {
Boolean, Varchar, Int, Bigint, Date
Boolean, Varchar, Int, Bigint, BINARY
}
}
......@@ -71,7 +71,7 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
Data data = (Data)message;
dataCache.hold();
if (dataCache.containsKey(data.id())) {
getClusterContext().getDataDefine(data.getDefineId()).mergeData(data, dataCache.get(data.id()));
getRole().dataDefine().mergeData(data, dataCache.get(data.id()));
} else {
dataCache.put(data.id(), data);
}
......
......@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
* @author pengys5
*/
public enum AttributeType {
STRING, LONG, FLOAT, INTEGER
STRING, LONG, FLOAT, INTEGER, BYTE
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import com.google.protobuf.ByteString;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.selector.AbstractHashMessage;
......@@ -12,22 +13,27 @@ public class Data extends AbstractHashMessage {
private final int longCapacity;
private final int floatCapacity;
private final int integerCapacity;
private final int byteCapacity;
private String[] dataStrings;
private Long[] dataLongs;
private Float[] dataFloats;
private Integer[] dataIntegers;
private byte[][] dataBytes;
public Data(String id, int defineId, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity) {
public Data(String id, int defineId, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity,
int byteCapacity) {
super(id);
this.defineId = defineId;
this.dataStrings = new String[stringCapacity];
this.dataLongs = new Long[longCapacity];
this.dataFloats = new Float[floatCapacity];
this.dataIntegers = new Integer[integerCapacity];
this.dataBytes = new byte[byteCapacity][];
this.stringCapacity = stringCapacity;
this.longCapacity = longCapacity;
this.floatCapacity = floatCapacity;
this.integerCapacity = integerCapacity;
this.byteCapacity = byteCapacity;
}
public void setDataString(int position, String value) {
......@@ -46,6 +52,10 @@ public class Data extends AbstractHashMessage {
dataIntegers[position] = value;
}
public void setDataBytes(int position, byte[] dataBytes) {
this.dataBytes[position] = dataBytes;
}
public String getDataString(int position) {
return dataStrings[position];
}
......@@ -62,6 +72,10 @@ public class Data extends AbstractHashMessage {
return dataIntegers[position];
}
public byte[] getDataBytes(int position) {
return dataBytes[position];
}
public String id() {
return dataStrings[0];
}
......@@ -76,6 +90,7 @@ public class Data extends AbstractHashMessage {
builder.setFloatCapacity(floatCapacity);
builder.setStringCapacity(stringCapacity);
builder.setLongCapacity(longCapacity);
builder.setByteCapacity(byteCapacity);
for (int i = 0; i < dataStrings.length; i++) {
builder.setDataStrings(i, dataStrings[i]);
......@@ -89,6 +104,9 @@ public class Data extends AbstractHashMessage {
for (int i = 0; i < dataLongs.length; i++) {
builder.setDataLongs(i, dataLongs[i]);
}
for (int i = 0; i < dataBytes.length; i++) {
builder.setDataBytes(i, ByteString.copyFrom(dataBytes[i]));
}
return builder.build();
}
}
......@@ -11,6 +11,7 @@ public abstract class DataDefine {
private int longCapacity;
private int floatCapacity;
private int integerCapacity;
private int byteCapacity;
public DataDefine() {
initial();
......@@ -28,6 +29,8 @@ public abstract class DataDefine {
floatCapacity++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
integerCapacity++;
} else if (AttributeType.BYTE.equals(attribute.getType())) {
byteCapacity++;
}
}
}
......@@ -43,7 +46,7 @@ public abstract class DataDefine {
protected abstract void attributeDefine();
public final Data build(String id) {
return new Data(id, defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity);
return new Data(id, defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity, byteCapacity);
}
public void mergeData(Data newData, Data oldData) {
......@@ -51,6 +54,7 @@ public abstract class DataDefine {
int longPosition = 0;
int floatPosition = 0;
int integerPosition = 0;
int bytePosition = 0;
for (int i = 0; i < initialCapacity(); i++) {
Attribute attribute = attributes[i];
if (AttributeType.STRING.equals(attribute.getType())) {
......@@ -65,6 +69,9 @@ public abstract class DataDefine {
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataInteger(integerPosition), oldData.getDataInteger(integerPosition));
integerPosition++;
} else if (AttributeType.BYTE.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataBytes(bytePosition), oldData.getDataBytes(integerPosition));
bytePosition++;
}
}
}
......
......@@ -11,4 +11,6 @@ public interface Operation {
Float operate(Float newValue, Float oldValue);
Integer operate(Integer newValue, Integer oldValue);
byte[] operate(byte[] newValue, byte[] oldValue);
}
......@@ -21,4 +21,8 @@ public class CoverOperation implements Operation {
@Override public Integer operate(Integer newValue, Integer oldValue) {
return newValue;
}
@Override public byte[] operate(byte[] newValue, byte[] oldValue) {
return newValue;
}
}
......@@ -21,4 +21,8 @@ public class NonOperation implements Operation {
@Override public Integer operate(Integer newValue, Integer oldValue) {
return oldValue;
}
@Override public byte[] operate(byte[] newValue, byte[] oldValue) {
return oldValue;
}
}
......@@ -18,10 +18,12 @@ message RemoteData {
int32 longCapacity = 2;
int32 floatCapacity = 3;
int32 integerCapacity = 4;
repeated string dataStrings = 5;
repeated int64 dataLongs = 6;
repeated float dataFloats = 7;
repeated int32 dataIntegers = 8;
int32 byteCapacity = 5;
repeated string dataStrings = 6;
repeated int64 dataLongs = 7;
repeated float dataFloats = 8;
repeated int32 dataIntegers = 9;
repeated bytes dataBytes = 10;
}
message Empty {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册