提交 dbdd0838 编写于 作者: P pengys5

Add the implementation of deserialize and serialize method in data define class.

上级 dae5c692
......@@ -91,10 +91,10 @@ public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanLis
logger.debug("node reference summary listener build");
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
for (NodeReferenceDataDefine.NodeReference referenceSum : nodeReferences) {
for (NodeReferenceDataDefine.NodeReference nodeReference : nodeReferences) {
try {
logger.debug("send to node reference summary aggregation worker, id: {}", referenceSum.getId());
context.getClusterWorkerContext().lookup(NodeReferenceAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData());
logger.debug("send to node reference summary aggregation worker, id: {}", nodeReference.getId());
context.getClusterWorkerContext().lookup(NodeReferenceAggregationWorker.WorkerRole.INSTANCE).tell(nodeReference.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
......
package org.skywalking.apm.collector.storage.define.global;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.storage.define.DataDefine;
......@@ -26,21 +26,11 @@ public class GlobalTraceDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String globalTraceId = remoteData.getDataStrings(2);
Long timeBucket = remoteData.getDataLongs(0);
return new GlobalTrace(id, segmentId, globalTraceId, timeBucket);
return null;
}
@Override public RemoteData serialize(Object object) {
GlobalTrace globalTrace = (GlobalTrace)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(globalTrace.getId());
builder.addDataStrings(globalTrace.getSegmentId());
builder.addDataStrings(globalTrace.getGlobalTraceId());
builder.addDataLongs(globalTrace.getTimeBucket());
return builder.build();
return null;
}
public static class GlobalTrace implements Transform {
......@@ -49,13 +39,6 @@ public class GlobalTraceDataDefine extends DataDefine {
private String globalTraceId;
private long timeBucket;
GlobalTrace(String id, String segmentId, String globalTraceId, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.globalTraceId = globalTraceId;
this.timeBucket = timeBucket;
}
public GlobalTrace() {
}
......
......@@ -29,11 +29,25 @@ public class InstPerformanceDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
Data data = build(remoteData.getDataStrings(0));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataInteger(2, remoteData.getDataIntegers(2));
data.setDataLong(0, remoteData.getDataLongs(0));
data.setDataLong(1, remoteData.getDataLongs(1));
return data;
}
@Override public RemoteData serialize(Object object) {
return null;
Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataIntegers(data.getDataInteger(2));
builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(data.getDataLong(1));
return builder.build();
}
public static class InstPerformance implements Transform<InstPerformance> {
......@@ -44,16 +58,6 @@ public class InstPerformanceDataDefine extends DataDefine {
private long costTotal;
private long timeBucket;
public InstPerformance(String id, int applicationId, int instanceId, int calls, long costTotal,
long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.instanceId = instanceId;
this.calls = calls;
this.costTotal = costTotal;
this.timeBucket = timeBucket;
}
public InstPerformance() {
}
......
......@@ -28,11 +28,25 @@ public class NodeComponentDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
Data data = build(remoteData.getDataStrings(0));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(2, remoteData.getDataStrings(2));
data.setDataLong(0, remoteData.getDataLongs(0));
return data;
}
@Override public RemoteData serialize(Object object) {
return null;
Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataStrings(data.getDataString(1));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(2));
builder.addDataLongs(data.getDataLong(0));
return builder.build();
}
public static class NodeComponent implements Transform<NodeComponent> {
......@@ -43,16 +57,6 @@ public class NodeComponentDataDefine extends DataDefine {
private String peer;
private long timeBucket;
public NodeComponent(String id, Integer componentId, String componentName, Integer peerId, String peer,
long timeBucket) {
this.id = id;
this.componentId = componentId;
this.componentName = componentName;
this.peerId = peerId;
this.peer = peer;
this.timeBucket = timeBucket;
}
public NodeComponent() {
}
......
......@@ -27,11 +27,23 @@ public class NodeMappingDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
Data data = build(remoteData.getDataStrings(0));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataLong(0, remoteData.getDataLongs(0));
return data;
}
@Override public RemoteData serialize(Object object) {
return null;
Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(1));
builder.addDataLongs(data.getDataLong(0));
return builder.build();
}
public static class NodeMapping implements Transform<NodeMapping> {
......@@ -41,14 +53,6 @@ public class NodeMappingDataDefine extends DataDefine {
private String address;
private long timeBucket;
public NodeMapping(String id, int applicationId, int addressId, String address, long timeBucket) {
this.id = id;
this.applicationId = applicationId;
this.addressId = addressId;
this.address = address;
this.timeBucket = timeBucket;
}
public NodeMapping() {
}
......
......@@ -33,18 +33,18 @@ public class NodeReferenceDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
int frontApplicationId = remoteData.getDataIntegers(0);
int behindApplicationId = remoteData.getDataIntegers(1);
String behindPeer = remoteData.getDataStrings(1);
int s1LTE = remoteData.getDataIntegers(2);
int s3LTE = remoteData.getDataIntegers(3);
int s5LTE = remoteData.getDataIntegers(4);
int s5GT = remoteData.getDataIntegers(5);
int summary = remoteData.getDataIntegers(6);
int error = remoteData.getDataIntegers(7);
long timeBucket = remoteData.getDataLongs(0);
return new NodeReference(id, frontApplicationId, behindApplicationId, behindPeer, s1LTE, s3LTE, s5LTE, s5GT, summary, error, timeBucket);
Data data = build(remoteData.getDataStrings(0));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(2, remoteData.getDataIntegers(2));
data.setDataInteger(3, remoteData.getDataIntegers(3));
data.setDataInteger(4, remoteData.getDataIntegers(4));
data.setDataInteger(5, remoteData.getDataIntegers(5));
data.setDataInteger(6, remoteData.getDataIntegers(6));
data.setDataInteger(7, remoteData.getDataIntegers(7));
data.setDataLong(0, remoteData.getDataLongs(0));
return data;
}
@Override public RemoteData serialize(Object object) {
......@@ -77,22 +77,6 @@ public class NodeReferenceDataDefine extends DataDefine {
private int error = 0;
private long timeBucket;
public NodeReference(String id, int frontApplicationId, int behindApplicationId, String behindPeer, int s1LTE,
int s3LTE,
int s5LTE, int s5GT, int summary, int error, long timeBucket) {
this.id = id;
this.frontApplicationId = frontApplicationId;
this.behindApplicationId = behindApplicationId;
this.behindPeer = behindPeer;
this.s1LTE = s1LTE;
this.s3LTE = s3LTE;
this.s5LTE = s5LTE;
this.s5GT = s5GT;
this.summary = summary;
this.error = error;
this.timeBucket = timeBucket;
}
public NodeReference() {
}
......
package org.skywalking.apm.collector.storage.define.segment;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......@@ -30,29 +30,11 @@ public class SegmentCostDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String segmentId = remoteData.getDataStrings(1);
String serviceName = remoteData.getDataStrings(2);
Long cost = remoteData.getDataLongs(0);
Long startTime = remoteData.getDataLongs(1);
Long endTime = remoteData.getDataLongs(2);
Boolean isError = remoteData.getDataBooleans(0);
Long timeBucket = remoteData.getDataLongs(2);
return new SegmentCost(id, segmentId, serviceName, cost, startTime, endTime, isError, timeBucket);
return null;
}
@Override public RemoteData serialize(Object object) {
SegmentCost segmentCost = (SegmentCost)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(segmentCost.getId());
builder.addDataStrings(segmentCost.getSegmentId());
builder.addDataStrings(segmentCost.getServiceName());
builder.addDataLongs(segmentCost.getCost());
builder.addDataLongs(segmentCost.getStartTime());
builder.addDataLongs(segmentCost.getEndTime());
builder.addDataBooleans(segmentCost.isError());
builder.addDataLongs(segmentCost.getTimeBucket());
return builder.build();
return null;
}
public static class SegmentCost implements Transform {
......@@ -65,18 +47,6 @@ public class SegmentCostDataDefine extends DataDefine {
private boolean isError;
private long timeBucket;
SegmentCost(String id, String segmentId, String serviceName, Long cost,
Long startTime, Long endTime, boolean isError, long timeBucket) {
this.id = id;
this.segmentId = segmentId;
this.serviceName = serviceName;
this.cost = cost;
this.startTime = startTime;
this.endTime = endTime;
this.isError = isError;
this.timeBucket = timeBucket;
}
public SegmentCost() {
}
......
package org.skywalking.apm.collector.storage.define.segment;
import com.google.protobuf.ByteString;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.core.stream.Transform;
import org.skywalking.apm.collector.core.stream.operate.CoverOperation;
import org.skywalking.apm.collector.core.stream.operate.NonOperation;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.storage.define.Attribute;
import org.skywalking.apm.collector.storage.define.AttributeType;
import org.skywalking.apm.collector.storage.define.DataDefine;
/**
* @author pengys5
......@@ -25,28 +24,17 @@ public class SegmentDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
byte[] dataBinary = remoteData.getDataBytes(0).toByteArray();
return new Segment(id, dataBinary);
return null;
}
@Override public RemoteData serialize(Object object) {
Segment segment = (Segment)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(segment.getId());
builder.addDataBytes(ByteString.copyFrom(segment.getDataBinary()));
return builder.build();
return null;
}
public static class Segment implements Transform {
private String id;
private byte[] dataBinary;
public Segment(String id, byte[] dataBinary) {
this.id = id;
this.dataBinary = dataBinary;
}
public Segment() {
}
......
......@@ -28,11 +28,25 @@ public class ServiceEntryDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
return null;
Data data = build(remoteData.getDataStrings(0));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataLong(0, remoteData.getDataLongs(0));
data.setDataLong(1, remoteData.getDataLongs(1));
return data;
}
@Override public RemoteData serialize(Object object) {
return null;
Data data = (Data)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(data.getDataString(0));
builder.addDataIntegers(data.getDataInteger(0));
builder.addDataIntegers(data.getDataInteger(1));
builder.addDataStrings(data.getDataString(1));
builder.addDataLongs(data.getDataLong(0));
builder.addDataLongs(data.getDataLong(1));
return builder.build();
}
public static class ServiceEntry implements Transform<ServiceEntry> {
......
......@@ -37,23 +37,22 @@ public class ServiceReferenceDataDefine extends DataDefine {
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
int entryServiceId = remoteData.getDataIntegers(0);
String entryServiceName = remoteData.getDataStrings(1);
int frontServiceId = remoteData.getDataIntegers(1);
String frontServiceName = remoteData.getDataStrings(2);
int behindServiceId = remoteData.getDataIntegers(2);
String behindServiceName = remoteData.getDataStrings(3);
long s1Lte = remoteData.getDataLongs(0);
long s3Lte = remoteData.getDataLongs(1);
long s5Lte = remoteData.getDataLongs(2);
long s5Gt = remoteData.getDataLongs(3);
long summary = remoteData.getDataLongs(4);
long error = remoteData.getDataLongs(5);
long costSummary = remoteData.getDataLongs(6);
long timeBucket = remoteData.getDataLongs(7);
return new ServiceReference(id, entryServiceId, entryServiceName, frontServiceId, frontServiceName,
behindServiceId, behindServiceName, s1Lte, s3Lte, s5Lte, s5Gt, summary, error, costSummary, timeBucket);
Data data = build(remoteData.getDataStrings(0));
data.setDataInteger(0, remoteData.getDataIntegers(0));
data.setDataString(1, remoteData.getDataStrings(1));
data.setDataInteger(1, remoteData.getDataIntegers(1));
data.setDataString(2, remoteData.getDataStrings(2));
data.setDataInteger(2, remoteData.getDataIntegers(2));
data.setDataString(3, remoteData.getDataStrings(3));
data.setDataLong(0, remoteData.getDataLongs(0));
data.setDataLong(1, remoteData.getDataLongs(1));
data.setDataLong(2, remoteData.getDataLongs(2));
data.setDataLong(3, remoteData.getDataLongs(3));
data.setDataLong(4, remoteData.getDataLongs(4));
data.setDataLong(5, remoteData.getDataLongs(5));
data.setDataLong(6, remoteData.getDataLongs(6));
data.setDataLong(7, remoteData.getDataLongs(7));
return data;
}
@Override public RemoteData serialize(Object object) {
......@@ -94,26 +93,6 @@ public class ServiceReferenceDataDefine extends DataDefine {
private long costSummary;
private long timeBucket;
public ServiceReference(String id, int entryServiceId, String entryServiceName, int frontServiceId,
String frontServiceName, int behindServiceId, String behindServiceName, long s1Lte, long s3Lte, long s5Lte,
long s5Gt, long summary, long error, long costSummary, long timeBucket) {
this.id = id;
this.entryServiceId = entryServiceId;
this.entryServiceName = entryServiceName;
this.frontServiceId = frontServiceId;
this.frontServiceName = frontServiceName;
this.behindServiceId = behindServiceId;
this.behindServiceName = behindServiceName;
this.s1Lte = s1Lte;
this.s3Lte = s3Lte;
this.s5Lte = s5Lte;
this.s5Gt = s5Gt;
this.summary = summary;
this.error = error;
this.costSummary = costSummary;
this.timeBucket = timeBucket;
}
public ServiceReference() {
}
......
......@@ -2,8 +2,8 @@ package org.skywalking.apm.collector.stream.worker.selector;
import java.util.List;
import org.skywalking.apm.collector.core.stream.AbstractHashMessage;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
import org.skywalking.apm.collector.stream.worker.AbstractWorker;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef}
......@@ -30,7 +30,7 @@ public class HashCodeSelector implements WorkerSelector<WorkerRef> {
int selectIndex = Math.abs(hashMessage.getHashCode()) % size;
return members.get(selectIndex);
} else {
throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage");
throw new IllegalArgumentException("the message send into HashCodeSelector must implementation of AbstractHashMessage, the message object class is: " + message.getClass().getName());
}
}
}
......@@ -136,10 +136,11 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
JsonArray serviceReferenceArray = new JsonArray();
JsonObject rootServiceReference = findRoot(serviceReferenceMap);
if (ObjectUtils.isNotEmpty(rootServiceReference)) {
serviceReferenceArray.add(rootServiceReference);
String id = rootServiceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)) + Const.ID_SPLIT + rootServiceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID));
serviceReferenceMap.remove(id);
int rootServiceId = rootServiceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).getAsInt();
int rootServiceId = rootServiceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).getAsInt();
sortAsTree(rootServiceId, serviceReferenceArray, serviceReferenceMap);
}
return serviceReferenceArray;
......@@ -244,8 +245,8 @@ public class ServiceReferenceEsDAO extends EsDAO implements IServiceReferenceDAO
private JsonObject findRoot(Map<String, JsonObject> serviceReferenceMap) {
for (JsonObject serviceReference : serviceReferenceMap.values()) {
int behindServiceId = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).getAsInt();
if (behindServiceId == 1) {
int frontServiceId = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).getAsInt();
if (frontServiceId == 1) {
return serviceReference;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册