提交 70fedb10 编写于 作者: P pengys5

Rename data entity

上级 c43d375d
......@@ -46,6 +46,6 @@ public abstract class AbstractWorker {
}
final protected void saveException(Exception e) {
// e.printStackTrace();
logger().error(e);
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitAnalysisData;
/**
* @author pengys5
*/
public abstract class JoinAndSplitAnalysisMember extends AnalysisMember {
private JoinAndSplitAnalysisData joinAndSplitAnalysisData;
public JoinAndSplitAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
joinAndSplitAnalysisData = new JoinAndSplitAnalysisData();
}
private JoinAndSplitAnalysisData getJoinAndSplitAnalysisData() {
return joinAndSplitAnalysisData;
}
final protected void set(String id, String attributeName, String value) throws Exception {
getJoinAndSplitAnalysisData().getOrCreate(id).set(attributeName, value);
}
@Override
final protected void aggregation() throws Exception {
getJoinAndSplitAnalysisData().asMap().forEach((key, value) -> {
try {
aggWorkRefs().tell(value);
} catch (Exception e) {
logger().error(e);
}
});
getJoinAndSplitAnalysisData().asMap().clear();
}
protected abstract WorkerRefs aggWorkRefs();
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitPersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class JoinAndSplitPersistenceMember extends PersistenceMember<JoinAndSplitPersistenceData, JoinAndSplitData> {
private Logger logger = LogManager.getFormatterLogger(JoinAndSplitPersistenceMember.class);
protected JoinAndSplitPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public JoinAndSplitPersistenceData initializeData() {
return new JoinAndSplitPersistenceData();
}
@Override
final public void analyse(Object message) throws Exception {
if (message instanceof JoinAndSplitData) {
JoinAndSplitData joinAndSplitData = (JoinAndSplitData) message;
JoinAndSplitPersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(joinAndSplitData.getId()).merge(joinAndSplitData);
data.release();
} else {
logger.error("unhandled message, message instance must JoinAndSplitData, but is %s", message.getClass().toString());
}
}
@Override
final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
Map<String, JoinAndSplitData> lastData = getPersistenceData().getLast().asMap();
extractData(lastData);
Client client = EsClient.INSTANCE.getClient();
lastData.forEach((key, value) -> {
IndexRequestBuilder builder = client.prepareIndex(esIndex(), esType(), key).setSource(value.asMap());
builderList.add(builder);
});
lastData.clear();
}
}
......@@ -16,8 +16,8 @@ public abstract class MetricAnalysisMember extends AnalysisMember {
super(role, clusterContext, selfContext);
}
final protected void setMetric(String id, String column, Long value) throws Exception {
getMetricAnalysisData().getOrCreate(id).setMetric(column, value);
final protected void set(String id, String metricName, Long value) throws Exception {
getMetricAnalysisData().getOrCreate(id).set(metricName, value);
}
private MetricAnalysisData getMetricAnalysisData() {
......
......@@ -22,8 +22,8 @@ public abstract class RecordAnalysisMember extends AnalysisMember {
super(role, clusterContext, selfContext);
}
final public void setRecord(String id, JsonObject record) throws Exception {
getRecordAnalysisData().getOrCreate(id).setRecord(record);
final public void set(String id, JsonObject record) throws Exception {
getRecordAnalysisData().getOrCreate(id).set(record);
}
private RecordAnalysisData getRecordAnalysisData() {
......
......@@ -29,10 +29,10 @@ public abstract class RecordPersistenceMember extends PersistenceMember<RecordPe
public void analyse(Object message) throws Exception {
if (message instanceof RecordData) {
RecordData recordData = (RecordData) message;
logger().debug("setRecord: id: %s, data: %s", recordData.getId(), recordData.getRecord());
logger().debug("set: id: %s, data: %s", recordData.getId(), recordData.get());
RecordPersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(recordData.getId()).setRecord(recordData.getRecord());
data.getOrCreate(recordData.getId()).set(recordData.get());
data.release();
} else {
logger().error("message unhandled");
......@@ -46,7 +46,7 @@ public abstract class RecordPersistenceMember extends PersistenceMember<RecordPe
Client client = EsClient.INSTANCE.getClient();
lastData.forEach((key, value) -> {
IndexRequestBuilder builder = client.prepareIndex(esIndex(), esType(), key).setSource(value.getRecord().toString());
IndexRequestBuilder builder = client.prepareIndex(esIndex(), esType(), key).setSource(value.get().toString());
builderList.add(builder);
});
lastData.clear();
......
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.globaltrace.analysis;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MergeAnalysisMember;
import com.a.eye.skywalking.collector.worker.JoinAndSplitAnalysisMember;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg;
......@@ -19,7 +19,7 @@ import java.util.List;
/**
* @author pengys5
*/
public class GlobalTraceAnalysis extends MergeAnalysisMember {
public class GlobalTraceAnalysis extends JoinAndSplitAnalysisMember {
private Logger logger = LogManager.getFormatterLogger(GlobalTraceAnalysis.class);
......@@ -37,7 +37,7 @@ public class GlobalTraceAnalysis extends MergeAnalysisMember {
if (CollectionTools.isNotEmpty(globalTraceIdList)) {
for (GlobalTraceId disTraceId : globalTraceIdList) {
String traceId = disTraceId.get();
setMergeData(traceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
set(traceId, GlobalTraceIndex.SUB_SEG_IDS, subSegmentId);
}
}
} else {
......
......@@ -4,7 +4,7 @@ import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -27,10 +27,10 @@ public class GlobalTraceAgg extends AbstractClusterWorker {
@Override
protected void onWork(Object message) throws Exception {
if (message instanceof MergeData) {
if (message instanceof JoinAndSplitData) {
getSelfContext().lookup(GlobalTraceSave.Role.INSTANCE).tell(message);
} else {
logger.error("unhandled message, message instance must MergeData, but is %s", message.getClass().toString());
logger.error("unhandled message, message instance must JoinAndSplitData, but is %s", message.getClass().toString());
}
}
......
......@@ -5,14 +5,14 @@ import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.HashCodeSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.MergePersistenceMember;
import com.a.eye.skywalking.collector.worker.JoinAndSplitPersistenceMember;
import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.storage.PersistenceWorkerListener;
/**
* @author pengys5
*/
public class GlobalTraceSave extends MergePersistenceMember {
public class GlobalTraceSave extends JoinAndSplitPersistenceMember {
GlobalTraceSave(com.a.eye.skywalking.collector.actor.Role role, ClusterWorkerContext clusterContext,
LocalWorkerContext selfContext) {
......
......@@ -8,7 +8,7 @@ import com.a.eye.skywalking.collector.worker.globaltrace.GlobalTraceIndex;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.entity.*;
import com.a.eye.skywalking.collector.worker.storage.GetResponseFromEs;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
......@@ -41,7 +41,7 @@ public class GlobalTraceSearchWithGlobalId extends AbstractLocalSyncWorker {
logger.debug("globalTraceObj: %s", globalTraceObj);
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SUB_SEG_IDS).getAsString();
String[] subSegIds = subSegIdsStr.split(MergeData.SPLIT);
String[] subSegIds = subSegIdsStr.split(JoinAndSplitData.SPLIT);
List<SpanView> spanViewList = new ArrayList<>();
for (String subSegId : subSegIds) {
......
package com.a.eye.skywalking.collector.worker.httpserver;
import com.a.eye.skywalking.collector.actor.*;
import com.a.eye.skywalking.collector.actor.AbstractLocalAsyncWorker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalAsyncWorkerRef;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import java.io.BufferedReader;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @author pengys5
......@@ -21,8 +26,7 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
super(role, clusterContext, selfContext);
}
@Override
final public void onWork(Object message) throws Exception {
@Override final public void onWork(Object message) throws Exception {
onReceive(message);
}
......@@ -30,21 +34,23 @@ public abstract class AbstractPost extends AbstractLocalAsyncWorker {
static class PostWithHttpServlet extends AbstractHttpServlet {
private Logger logger = LogManager.getFormatterLogger(PostWithHttpServlet.class);
private final LocalAsyncWorkerRef ownerWorkerRef;
PostWithHttpServlet(LocalAsyncWorkerRef ownerWorkerRef) {
this.ownerWorkerRef = ownerWorkerRef;
}
@Override
final protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@Override final protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
JsonObject resJson = new JsonObject();
try {
BufferedReader bufferedReader = request.getReader();
streamReader(bufferedReader);
reply(response, resJson, HttpServletResponse.SC_OK);
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
resJson.addProperty("error", e.getMessage());
reply(response, resJson, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
......
......@@ -43,7 +43,7 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
compJsonObj.addProperty(NodeCompIndex.PEERS, peers);
compJsonObj.addProperty(NodeCompIndex.NAME, Tags.COMPONENT.get(span));
setRecord(peers, compJsonObj);
set(peers, compJsonObj);
} else if (Tags.SPAN_KIND_SERVER.equals(kind) && span.getParentSpanId() == -1) {
String peers = segment.getApplicationCode();
......@@ -51,7 +51,7 @@ abstract class AbstractNodeCompAnalysis extends RecordAnalysisMember {
compJsonObj.addProperty(NodeCompIndex.PEERS, peers);
compJsonObj.addProperty(NodeCompIndex.NAME, Tags.COMPONENT.get(span));
setRecord(peers, compJsonObj);
set(peers, compJsonObj);
}
}
}
......
......@@ -42,7 +42,7 @@ abstract class AbstractNodeMappingAnalysis extends RecordAnalysisMember {
nodeMappingJsonObj.addProperty(NodeMappingIndex.TIME_SLICE, timeSlice);
String id = timeSlice + Const.ID_SPLIT + code + Const.ID_SPLIT + peers;
setRecord(id, nodeMappingJsonObj);
set(id, nodeMappingJsonObj);
}
}
}
......
......@@ -49,7 +49,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
String id = timeSlice + Const.ID_SPLIT + front + Const.ID_SPLIT + behind;
logger.debug("dag node ref: %s", dataJsonObj.toString());
setRecord(id, dataJsonObj);
set(id, dataJsonObj);
buildNodeRefResRecordData(id, span, minute, hour, day, second);
} else if (Tags.SPAN_KIND_SERVER.equals(Tags.SPAN_KIND.get(span))) {
if (span.getParentSpanId() == -1 && CollectionTools.isEmpty(segment.getRefs())) {
......@@ -60,7 +60,7 @@ abstract class AbstractNodeRefAnalysis extends RecordAnalysisMember {
dataJsonObj.addProperty(NodeRefIndex.FRONT, front);
String id = timeSlice + Const.ID_SPLIT + front + Const.ID_SPLIT + behind;
setRecord(id, dataJsonObj);
set(id, dataJsonObj);
buildNodeRefResRecordData(id, span, minute, hour, day, second);
}
}
......
......@@ -22,23 +22,23 @@ abstract class AbstractNodeRefResSumAnalysis extends MetricAnalysisMember {
boolean isError = nodeRefRes.isError;
long cost = endTime - startTime;
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ONE_SECOND_LESS, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.THREE_SECOND_LESS, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_LESS, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_GREATER, 0L);
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ERROR, 0L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.ONE_SECOND_LESS, 0L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.THREE_SECOND_LESS, 0L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_LESS, 0L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_GREATER, 0L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.ERROR, 0L);
if (cost <= 1000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ONE_SECOND_LESS, 1L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.ONE_SECOND_LESS, 1L);
} else if (1000 < cost && cost <= 3000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.THREE_SECOND_LESS, 1L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.THREE_SECOND_LESS, 1L);
} else if (3000 < cost && cost <= 5000 && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_LESS, 1L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_LESS, 1L);
} else if (5000 < cost && !isError) {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_GREATER, 1L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.FIVE_SECOND_GREATER, 1L);
} else {
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.ERROR, 1L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.ERROR, 1L);
}
setMetric(nodeRefRes.nodeRefId, NodeRefResSumIndex.SUMMARY, 1L);
set(nodeRefRes.nodeRefId, NodeRefResSumIndex.SUMMARY, 1L);
}
public static class NodeRefResRecord extends AbstractTimeSlice {
......
......@@ -54,7 +54,7 @@ public class SegmentCostAnalysis extends RecordAnalysisMember {
cost = 1;
}
dataJsonObj.addProperty(SegmentCostIndex.COST, cost);
setRecord(segment.getTraceSegmentId(), dataJsonObj);
set(segment.getTraceSegmentId(), dataJsonObj);
}
}
}
......
......@@ -60,7 +60,7 @@ public class SegmentExceptionAnalysis extends RecordAnalysisMember {
}
}
dataJsonObj.add(SegmentExceptionIndex.ERROR_KIND, errorKind);
setRecord(segment.getTraceSegmentId(), dataJsonObj);
set(segment.getTraceSegmentId(), dataJsonObj);
}
}
} else {
......
......@@ -11,7 +11,7 @@ import com.a.eye.skywalking.collector.worker.segment.entity.GlobalTraceId;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.segment.entity.SegmentDeserialize;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import com.a.eye.skywalking.collector.worker.tools.CollectionTools;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
......@@ -55,7 +55,7 @@ public class SegmentTopSearchWithGlobalTraceId extends AbstractLocalSyncWorker {
if (globalTraceObj != null && globalTraceObj.has(GlobalTraceIndex.SUB_SEG_IDS)) {
String subSegIdsStr = globalTraceObj.get(GlobalTraceIndex.SUB_SEG_IDS).getAsString();
String[] subSegIds = subSegIdsStr.split(MergeData.SPLIT);
String[] subSegIds = subSegIdsStr.split(JoinAndSplitData.SPLIT);
topSegPaging.addProperty("recordsTotal", subSegIds.length);
......
package com.a.eye.skywalking.collector.worker.storage;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class JoinAndSplitAnalysisData {
private WindowData<JoinAndSplitData> windowData = new WindowData(new LinkedHashMap<String, JoinAndSplitData>());
public JoinAndSplitData getOrCreate(String id) {
if (!windowData.containsKey(id)) {
windowData.put(id, new JoinAndSplitData(id));
}
return windowData.get(id);
}
public Map<String, JoinAndSplitData> asMap() {
return windowData.asMap();
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage;
import java.util.*;
/**
* @author pengys5
*/
public class JoinAndSplitData extends AbstractHashMessage implements Data {
public static final String SPLIT = ",";
private String id;
private Map<String, Set<String>> data = new HashMap<>();
public JoinAndSplitData(String key) {
super(key);
this.id = key;
}
public String getId() {
return id;
}
public void set(String attributeName, String value) {
if (!data.containsKey(attributeName)) {
data.put(attributeName, new HashSet<>());
}
data.get(attributeName).add(value);
}
public void merge(JoinAndSplitData source) {
source.data.forEach((attributeName, valueSet) -> valueSet.forEach(value -> set(attributeName, value)));
}
public void merge(Map<String, ?> source) {
source.forEach((column, dbValue) -> {
if (!AbstractIndex.TIME_SLICE.equals(column) && !AbstractIndex.AGG_COLUMN.equals(column)) {
String[] dbValues = String.valueOf(dbValue).split(SPLIT);
for (String value : dbValues) {
set(column, value);
}
}
});
}
public Map<String, String> asMap() {
Map<String, String> source = new HashMap<>();
data.forEach((attributeName, valueSet) -> {
Iterator<String> iterator = valueSet.iterator();
StringBuilder builder = new StringBuilder();
int i = 0;
while (iterator.hasNext()) {
if (i == 0) {
builder.append(iterator.next());
} else {
builder.append(SPLIT).append(iterator.next());
}
i++;
}
source.put(attributeName, builder.toString());
});
return source;
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.storage;
/**
* @author pengys5
*/
public class JoinAndSplitPersistenceData extends Window<JoinAndSplitData> implements PersistenceData<JoinAndSplitData> {
private WindowData<JoinAndSplitData> lockedWindowData;
public JoinAndSplitData getOrCreate(String id) {
if (!lockedWindowData.containsKey(id)) {
lockedWindowData.put(id, new JoinAndSplitData(id));
}
return lockedWindowData.get(id);
}
public void hold() {
lockedWindowData = getCurrentAndHold();
}
public void release() {
lockedWindowData.release();
lockedWindowData = null;
}
}
......@@ -12,12 +12,12 @@ import java.util.Map;
public class MetricData extends AbstractHashMessage implements Data {
private String id;
private Map<String, Object> value;
private Map<String, Object> data;
public MetricData(String id) {
super(id);
this.id = id;
value = new LinkedHashMap<>();
data = new LinkedHashMap<>();
String[] ids = id.split(Const.IDS_SPLIT);
String slice = ids[0];
......@@ -27,43 +27,45 @@ public class MetricData extends AbstractHashMessage implements Data {
else aggId.append(Const.ID_SPLIT).append(ids[i]);
}
value.put(AbstractIndex.TIME_SLICE, Long.valueOf(slice));
value.put(AbstractIndex.AGG_COLUMN, aggId.toString());
data.put(AbstractIndex.TIME_SLICE, Long.valueOf(slice));
data.put(AbstractIndex.AGG_COLUMN, aggId.toString());
}
public void setMetric(String column, Long value) {
public void set(String metricName, Long value) {
long valueAdd = value;
if (this.value.containsKey(column) && !AbstractIndex.TIME_SLICE.equals(column)
&& !AbstractIndex.AGG_COLUMN.equals(column)) {
valueAdd += (Long) this.value.get(column);
if (this.data.containsKey(metricName) && !AbstractIndex.TIME_SLICE.equals(metricName)
&& !AbstractIndex.AGG_COLUMN.equals(metricName)) {
valueAdd += (Long) this.data.get(metricName);
}
this.value.put(column, valueAdd);
this.data.put(metricName, valueAdd);
}
public void merge(MetricData metricData) {
for (Map.Entry<String, Object> entry : metricData.value.entrySet()) {
public void merge(MetricData source) {
for (Map.Entry<String, Object> entry : source.data.entrySet()) {
if (!AbstractIndex.TIME_SLICE.equals(entry.getKey())
&& !AbstractIndex.AGG_COLUMN.equals(entry.getKey())) {
setMetric(entry.getKey(), (Long) entry.getValue());
set(entry.getKey(), (Long) entry.getValue());
}
}
}
public void merge(Map<String, ?> dbData) {
for (Map.Entry<String, ?> entry : dbData.entrySet()) {
@Override
public void merge(Map<String, ?> source) {
for (Map.Entry<String, ?> entry : source.entrySet()) {
if (!AbstractIndex.TIME_SLICE.equals(entry.getKey())
&& !AbstractIndex.AGG_COLUMN.equals(entry.getKey())) {
long dbValue = ((Number) entry.getValue()).longValue();
setMetric(entry.getKey(), dbValue);
set(entry.getKey(), dbValue);
}
}
}
@Override
public String getId() {
return id;
}
public Map<String, Object> asMap() {
return value;
return data;
}
}
......@@ -13,12 +13,12 @@ public class RecordData extends AbstractHashMessage implements Data {
private String id;
private String aggId;
private JsonObject record;
private JsonObject data;
public RecordData(String key) {
super(key);
this.id = key;
String[] ids = id.split(Const.IDS_SPLIT);
public RecordData(String id) {
super(id);
this.id = id;
String[] ids = this.id.split(Const.IDS_SPLIT);
for (int i = 1; i < ids.length; i++) {
if (i == 1) {
this.aggId = ids[i];
......@@ -26,23 +26,23 @@ public class RecordData extends AbstractHashMessage implements Data {
this.aggId += Const.ID_SPLIT + ids[i];
}
}
record = new JsonObject();
data = new JsonObject();
}
public String getId() {
return id;
}
public JsonObject getRecord() {
record.addProperty(AbstractIndex.AGG_COLUMN, this.aggId);
return record;
public JsonObject get() {
data.addProperty(AbstractIndex.AGG_COLUMN, this.aggId);
return data;
}
public void setRecord(JsonObject record) {
this.record = record;
public void set(JsonObject record) {
this.data = record;
}
@Override
public void merge(Map<String, ?> dbData) {
public void merge(Map<String, ?> source) {
}
}
......@@ -10,8 +10,8 @@ public class SegmentData implements Data {
private String id;
private String segmentStr;
public SegmentData(String key) {
this.id = key;
public SegmentData(String id) {
this.id = id;
}
public String getId() {
......@@ -19,7 +19,7 @@ public class SegmentData implements Data {
}
@Override
public void merge(Map<String, ?> dbData) {
public void merge(Map<String, ?> source) {
}
public String getSegmentStr() {
......
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitPersistenceData;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.powermock.api.mockito.PowerMockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({TestJoinAndSplitPersistenceMember.class, EsClient.class})
@PowerMockIgnore({"javax.management.*"})
public class JoinAndSplitPersistenceMemberTestCase {
private TestJoinAndSplitPersistenceMember mergePersistenceMember;
private JoinAndSplitPersistenceData persistenceData;
@Before
public void init() throws Exception {
MockEsBulkClient mockEsBulkClient = new MockEsBulkClient();
mockEsBulkClient.createMock();
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
mergePersistenceMember = PowerMockito.spy(new TestJoinAndSplitPersistenceMember(TestJoinAndSplitPersistenceMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
persistenceData = mock(JoinAndSplitPersistenceData.class);
JoinAndSplitData joinAndSplitData = mock(JoinAndSplitData.class);
when(mergePersistenceMember, "getPersistenceData").thenReturn(persistenceData);
when(persistenceData.getOrCreate(Mockito.anyString())).thenReturn(joinAndSplitData);
doCallRealMethod().when(mergePersistenceMember).analyse(Mockito.any(JoinAndSplitData.class));
}
@Test
public void testAnalyse() throws Exception {
String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
JoinAndSplitData joinAndSplitData = new JoinAndSplitData(id);
joinAndSplitData.set("Column", "VALUE");
mergePersistenceMember.analyse(joinAndSplitData);
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestJoinAndSplitAnalysisMember extends JoinAndSplitAnalysisMember {
TestJoinAndSplitAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
protected WorkerRefs aggWorkRefs() {
return null;
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return null;
}
@Override
public WorkerSelector workerSelector() {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestJoinAndSplitPersistenceMember extends JoinAndSplitPersistenceMember {
TestJoinAndSplitPersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return null;
}
@Override
public String esType() {
return null;
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return null;
}
@Override
public WorkerSelector workerSelector() {
return null;
}
}
}
......@@ -70,7 +70,7 @@ public enum RecordDataMergeJson {
private Map<String, JsonObject> recordData2Map(List<RecordData> recordDataList) {
Map<String, JsonObject> recordDataMap = new HashMap<>();
for (RecordData recordData : recordDataList) {
recordDataMap.put(recordData.getId(), recordData.getRecord());
recordDataMap.put(recordData.getId(), recordData.get());
}
return recordDataMap;
}
......
......@@ -8,7 +8,7 @@ import com.a.eye.skywalking.collector.worker.config.WorkerConfig;
import com.a.eye.skywalking.collector.worker.globaltrace.persistence.GlobalTraceAgg;
import com.a.eye.skywalking.collector.worker.mock.MergeDataAnswer;
import com.a.eye.skywalking.collector.worker.segment.mock.SegmentMock;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -46,7 +46,7 @@ public class GlobalTraceAnalysisTestCase {
clusterWorkerContext = PowerMockito.mock(ClusterWorkerContext.class);
WorkerRefs workerRefs = mock(WorkerRefs.class);
answer = new MergeDataAnswer();
doAnswer(answer).when(workerRefs).tell(Mockito.any(MergeData.class));
doAnswer(answer).when(workerRefs).tell(Mockito.any(JoinAndSplitData.class));
when(clusterWorkerContext.lookup(GlobalTraceAgg.Role.INSTANCE)).thenReturn(workerRefs);
......@@ -75,10 +75,10 @@ public class GlobalTraceAnalysisTestCase {
public void testAnalyse() throws Exception {
segmentMock.executeAnalysis(analysis);
Assert.assertEquals(1, answer.getMergeDataList().size());
MergeData mergeData = answer.getMergeDataList().get(0);
Assert.assertEquals(id, mergeData.getId());
String subSegIds = mergeData.asMap().get("subSegIds").toString();
Assert.assertEquals(1, answer.getJoinAndSplitDataList().size());
JoinAndSplitData joinAndSplitData = answer.getJoinAndSplitDataList().get(0);
Assert.assertEquals(id, joinAndSplitData.getId());
String subSegIds = joinAndSplitData.asMap().get("subSegIds").toString();
Assert.assertEquals(cacheServiceSubSegIds, subSegIds);
}
......
package com.a.eye.skywalking.collector.worker.mock;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public class MergeDataAnswer implements Answer<Object> {
private List<MergeData> mergeDataList = new ArrayList<>();
private List<JoinAndSplitData> joinAndSplitDataList = new ArrayList<>();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
MergeData mergeData = (MergeData)invocation.getArguments()[0];
mergeDataList.add(mergeData);
JoinAndSplitData joinAndSplitData = (JoinAndSplitData)invocation.getArguments()[0];
joinAndSplitDataList.add(joinAndSplitData);
return null;
}
public List<MergeData> getMergeDataList() {
return mergeDataList;
public List<JoinAndSplitData> getJoinAndSplitDataList() {
return joinAndSplitDataList;
}
}
......@@ -16,57 +16,57 @@ public enum NodeAnalysisVerify {
Assert.assertEquals(9, recordDataList.size());
RecordData data_1 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..[localhost:-1]");
Assert.assertEquals("H2", data_1.getRecord().get("component").getAsString());
Assert.assertEquals("[localhost:-1]", data_1.getRecord().get("peers").getAsString());
Assert.assertEquals("[localhost:-1]", data_1.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_1.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals("H2", data_1.get().get("component").getAsString());
Assert.assertEquals("[localhost:-1]", data_1.get().get("peers").getAsString());
Assert.assertEquals("[localhost:-1]", data_1.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_1.get().get("timeSlice").getAsLong());
RecordData data_2 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..portal-service");
Assert.assertEquals("Tomcat", data_2.getRecord().get("component").getAsString());
Assert.assertEquals("portal-service", data_2.getRecord().get("peers").getAsString());
Assert.assertEquals("portal-service", data_2.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_2.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals("Tomcat", data_2.get().get("component").getAsString());
Assert.assertEquals("portal-service", data_2.get().get("peers").getAsString());
Assert.assertEquals("portal-service", data_2.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_2.get().get("timeSlice").getAsLong());
RecordData data_3 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..persistence-service");
Assert.assertEquals(false, data_3.getRecord().has("component"));
Assert.assertEquals("persistence-service", data_3.getRecord().get("peers").getAsString());
Assert.assertEquals("persistence-service", data_3.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_3.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals(false, data_3.get().has("component"));
Assert.assertEquals("persistence-service", data_3.get().get("peers").getAsString());
Assert.assertEquals("persistence-service", data_3.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_3.get().get("timeSlice").getAsLong());
RecordData data_4 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..[10.128.35.80:20880]");
Assert.assertEquals("HttpClient", data_4.getRecord().get("component").getAsString());
Assert.assertEquals("[10.128.35.80:20880]", data_4.getRecord().get("peers").getAsString());
Assert.assertEquals("[10.128.35.80:20880]", data_4.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_4.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals("HttpClient", data_4.get().get("component").getAsString());
Assert.assertEquals("[10.128.35.80:20880]", data_4.get().get("peers").getAsString());
Assert.assertEquals("[10.128.35.80:20880]", data_4.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_4.get().get("timeSlice").getAsLong());
RecordData data_5 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..[127.0.0.1:6379]");
Assert.assertEquals("Redis", data_5.getRecord().get("component").getAsString());
Assert.assertEquals("[127.0.0.1:6379]", data_5.getRecord().get("peers").getAsString());
Assert.assertEquals("[127.0.0.1:6379]", data_5.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_5.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals("Redis", data_5.get().get("component").getAsString());
Assert.assertEquals("[127.0.0.1:6379]", data_5.get().get("peers").getAsString());
Assert.assertEquals("[127.0.0.1:6379]", data_5.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_5.get().get("timeSlice").getAsLong());
RecordData data_6 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..[127.0.0.1:8002]");
Assert.assertEquals("Motan", data_6.getRecord().get("component").getAsString());
Assert.assertEquals("[127.0.0.1:8002]", data_6.getRecord().get("peers").getAsString());
Assert.assertEquals("[127.0.0.1:8002]", data_6.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_6.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals("Motan", data_6.get().get("component").getAsString());
Assert.assertEquals("[127.0.0.1:8002]", data_6.get().get("peers").getAsString());
Assert.assertEquals("[127.0.0.1:8002]", data_6.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_6.get().get("timeSlice").getAsLong());
RecordData data_7 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..User");
Assert.assertEquals(false, data_7.getRecord().has("component"));
Assert.assertEquals("User", data_7.getRecord().get("peers").getAsString());
Assert.assertEquals("User", data_7.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_7.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals(false, data_7.get().has("component"));
Assert.assertEquals("User", data_7.get().get("peers").getAsString());
Assert.assertEquals("User", data_7.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_7.get().get("timeSlice").getAsLong());
RecordData data_8 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..[127.0.0.1:3307]");
Assert.assertEquals("Mysql", data_8.getRecord().get("component").getAsString());
Assert.assertEquals("[127.0.0.1:3307]", data_8.getRecord().get("peers").getAsString());
Assert.assertEquals("[127.0.0.1:3307]", data_8.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_8.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals("Mysql", data_8.get().get("component").getAsString());
Assert.assertEquals("[127.0.0.1:3307]", data_8.get().get("peers").getAsString());
Assert.assertEquals("[127.0.0.1:3307]", data_8.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_8.get().get("timeSlice").getAsLong());
RecordData data_9 = RecordDataTool.INSTANCE.getRecord(recordDataList, timeSlice + "..-..cache-service");
Assert.assertEquals(false, data_9.getRecord().has("component"));
Assert.assertEquals("cache-service", data_9.getRecord().get("peers").getAsString());
Assert.assertEquals("cache-service", data_9.getRecord().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_9.getRecord().get("timeSlice").getAsLong());
Assert.assertEquals(false, data_9.get().has("component"));
Assert.assertEquals("cache-service", data_9.get().get("peers").getAsString());
Assert.assertEquals("cache-service", data_9.get().get("aggId").getAsString());
Assert.assertEquals(timeSlice, data_9.get().get("timeSlice").getAsLong());
}
}
......@@ -41,12 +41,12 @@ public enum HttpClientTools {
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
} finally {
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
logger.error(e);
}
}
return null;
......@@ -65,12 +65,12 @@ public enum HttpClientTools {
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
} finally {
try {
httpClient.close();
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
}
}
return null;
......
package com.a.eye.skywalking.collector.worker.storage;
import org.junit.Assert;
import org.junit.Test;
/**
* @author pengys5
*/
public class JoinAndSplitAnalysisDataTestCase {
@Test
public void getOrCreate() {
JoinAndSplitAnalysisData joinAndSplitAnalysisData = new JoinAndSplitAnalysisData();
JoinAndSplitData joinAndSplitData = joinAndSplitAnalysisData.getOrCreate("Test1");
JoinAndSplitData joinAndSplitData_1 = joinAndSplitAnalysisData.getOrCreate("Test1");
Assert.assertEquals(joinAndSplitData, joinAndSplitData_1);
}
@Test
public void asMap() {
JoinAndSplitAnalysisData joinAndSplitAnalysisData = new JoinAndSplitAnalysisData();
JoinAndSplitData joinAndSplitData = joinAndSplitAnalysisData.getOrCreate("Test1");
JoinAndSplitData joinAndSplitData_1 = joinAndSplitAnalysisData.asMap().get("Test1");
Assert.assertEquals(joinAndSplitData, joinAndSplitData_1);
}
}
package com.a.eye.skywalking.collector.worker.storage;
import java.lang.reflect.Field;
import org.junit.Assert;
import org.junit.Test;
/**
* @author pengys5
*/
public class JoinAndSplitPersistenceDataTestCase {
@Test
public void testGetElseCreate() {
JoinAndSplitPersistenceData persistenceData = new JoinAndSplitPersistenceData();
persistenceData.hold();
JoinAndSplitData joinAndSplitData = persistenceData.getOrCreate("test");
Assert.assertEquals("test", joinAndSplitData.getId());
}
@Test
public void testSize() {
JoinAndSplitPersistenceData persistenceData = new JoinAndSplitPersistenceData();
persistenceData.hold();
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getOrCreate("test_2");
Assert.assertEquals(2, persistenceData.getCurrentAndHold().size());
}
@Test
public void testClear() {
JoinAndSplitPersistenceData persistenceData = new JoinAndSplitPersistenceData();
persistenceData.hold();
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getCurrentAndHold().clear();
Assert.assertEquals(0, persistenceData.getCurrentAndHold().size());
}
@Test
public void hold() throws NoSuchFieldException, IllegalAccessException {
JoinAndSplitPersistenceData persistenceData = new JoinAndSplitPersistenceData();
persistenceData.hold();
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<JoinAndSplitData> windowData = (WindowData<JoinAndSplitData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
}
@Test
public void release() throws NoSuchFieldException, IllegalAccessException {
JoinAndSplitPersistenceData persistenceData = new JoinAndSplitPersistenceData();
persistenceData.hold();
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<JoinAndSplitData> windowData = (WindowData<JoinAndSplitData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
persistenceData.release();
Assert.assertEquals(false, windowData.isHolding());
}
}
package com.a.eye.skywalking.collector.worker.storage;
import org.junit.Assert;
import org.junit.Test;
/**
* @author pengys5
*/
public class MergeAnalysisDataTestCase {
@Test
public void getOrCreate() {
MergeAnalysisData mergeAnalysisData = new MergeAnalysisData();
MergeData mergeData = mergeAnalysisData.getOrCreate("Test1");
MergeData mergeData_1 = mergeAnalysisData.getOrCreate("Test1");
Assert.assertEquals(mergeData, mergeData_1);
}
@Test
public void asMap() {
MergeAnalysisData mergeAnalysisData = new MergeAnalysisData();
MergeData mergeData = mergeAnalysisData.getOrCreate("Test1");
MergeData mergeData_1 = mergeAnalysisData.asMap().get("Test1");
Assert.assertEquals(mergeData, mergeData_1);
}
}
......@@ -14,51 +14,51 @@ public class MergeWindowDataTestCase {
@Test
public void testConstruction() {
String id = "Test";
MergeData mergeData = new MergeData(id);
JoinAndSplitData joinAndSplitData = new JoinAndSplitData(id);
Assert.assertEquals(id, mergeData.getId());
Assert.assertEquals(id, joinAndSplitData.getId());
}
@Test
public void testSetMergeData() {
String id = "Test";
MergeData mergeData = new MergeData(id);
JoinAndSplitData joinAndSplitData = new JoinAndSplitData(id);
mergeData.setMergeData("Column_1", "Value_1");
Assert.assertEquals("Value_1", mergeData.asMap().get("Column_1"));
mergeData.setMergeData("Column_1", "Value_1");
Assert.assertEquals("Value_1", mergeData.asMap().get("Column_1"));
joinAndSplitData.set("Column_1", "Value_1");
Assert.assertEquals("Value_1", joinAndSplitData.asMap().get("Column_1"));
joinAndSplitData.set("Column_1", "Value_1");
Assert.assertEquals("Value_1", joinAndSplitData.asMap().get("Column_1"));
mergeData.setMergeData("Column_1", "Value_2");
Assert.assertEquals("Value_2,Value_1", mergeData.asMap().get("Column_1"));
joinAndSplitData.set("Column_1", "Value_2");
Assert.assertEquals("Value_2,Value_1", joinAndSplitData.asMap().get("Column_1"));
mergeData.setMergeData("Column_2", "Value_3");
Assert.assertEquals("Value_3", mergeData.asMap().get("Column_2"));
joinAndSplitData.set("Column_2", "Value_3");
Assert.assertEquals("Value_3", joinAndSplitData.asMap().get("Column_2"));
}
@Test
public void testMerge() {
String id = "Test";
MergeData mergeData_1 = new MergeData(id);
mergeData_1.setMergeData("Column_1", "Value_1");
JoinAndSplitData joinAndSplitData_1 = new JoinAndSplitData(id);
joinAndSplitData_1.set("Column_1", "Value_1");
MergeData mergeData_2 = new MergeData(id);
mergeData_2.setMergeData("Column_1", "Value_2");
JoinAndSplitData joinAndSplitData_2 = new JoinAndSplitData(id);
joinAndSplitData_2.set("Column_1", "Value_2");
mergeData_1.merge(mergeData_2);
Assert.assertEquals("Value_2,Value_1", mergeData_1.asMap().get("Column_1"));
joinAndSplitData_1.merge(joinAndSplitData_2);
Assert.assertEquals("Value_2,Value_1", joinAndSplitData_1.asMap().get("Column_1"));
}
@Test
public void testMergeMap() {
String id = "Test";
MergeData mergeData_1 = new MergeData(id);
mergeData_1.setMergeData("Column_1", "Value_1");
JoinAndSplitData joinAndSplitData_1 = new JoinAndSplitData(id);
joinAndSplitData_1.set("Column_1", "Value_1");
Map<String, Object> dbData = new HashMap<>();
dbData.put("Column_1", "Value_2");
mergeData_1.merge(dbData);
Assert.assertEquals("Value_2,Value_1", mergeData_1.asMap().get("Column_1"));
joinAndSplitData_1.merge(dbData);
Assert.assertEquals("Value_2,Value_1", joinAndSplitData_1.asMap().get("Column_1"));
}
}
......@@ -17,7 +17,7 @@ public class MetricPersistenceTestCase {
MetricPersistenceData metricPersistenceData = new MetricPersistenceData();
metricPersistenceData.hold();
MetricData metricData = metricPersistenceData.getOrCreate(id);
metricData.setMetric("Column_1", 10L);
metricData.set("Column_1", 10L);
Assert.assertEquals(id, metricData.getId());
MetricData metricData1 = metricPersistenceData.getOrCreate(id);
......@@ -48,7 +48,7 @@ public class MetricPersistenceTestCase {
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<MergeData> windowData = (WindowData<MergeData>)testAField.get(persistenceData);
WindowData<JoinAndSplitData> windowData = (WindowData<JoinAndSplitData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
}
......@@ -59,7 +59,7 @@ public class MetricPersistenceTestCase {
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<MergeData> windowData = (WindowData<MergeData>)testAField.get(persistenceData);
WindowData<JoinAndSplitData> windowData = (WindowData<JoinAndSplitData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
persistenceData.release();
......
......@@ -34,10 +34,10 @@ public class MetricWindowDataTestCase {
String id_1 = "2016" + Const.ID_SPLIT + "B";
MetricData metricData = new MetricData(id_1);
metricData.setMetric("Column", 10L);
metricData.set("Column", 10L);
Assert.assertEquals(10L, metricData.asMap().get("Column"));
metricData.setMetric("Column", 10L);
metricData.set("Column", 10L);
Assert.assertEquals(20L, metricData.asMap().get("Column"));
}
......@@ -45,10 +45,10 @@ public class MetricWindowDataTestCase {
public void testMerge() {
String id_1 = "2016" + Const.ID_SPLIT + "B";
MetricData metricData_1 = new MetricData(id_1);
metricData_1.setMetric("Column", 10L);
metricData_1.set("Column", 10L);
MetricData metricData_2 = new MetricData(id_1);
metricData_2.setMetric("Column", 10L);
metricData_2.set("Column", 10L);
metricData_1.merge(metricData_2);
Assert.assertEquals(20L, metricData_1.asMap().get("Column"));
......@@ -58,7 +58,7 @@ public class MetricWindowDataTestCase {
public void testMergeMapData() {
String id_1 = "2016" + Const.ID_SPLIT + "B";
MetricData metricData_1 = new MetricData(id_1);
metricData_1.setMetric("Column", 10L);
metricData_1.set("Column", 10L);
Map<String, Object> dbData = new HashMap<>();
dbData.put("Column", 10L);
......
......@@ -21,12 +21,12 @@ public class RecordPersistenceTestCase {
recordPersistenceData.hold();
RecordData recordData = recordPersistenceData.getOrCreate(id);
recordData.setRecord(record);
recordData.set(record);
Assert.assertEquals(id, recordData.getId());
RecordData recordData1 = recordPersistenceData.getOrCreate(id);
Assert.assertEquals("Value_1", recordData1.getRecord().get("Column_1").getAsString());
Assert.assertEquals("Value_1", recordData1.get().get("Column_1").getAsString());
}
@Test
......@@ -52,7 +52,7 @@ public class RecordPersistenceTestCase {
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<MergeData> windowData = (WindowData<MergeData>)testAField.get(persistenceData);
WindowData<JoinAndSplitData> windowData = (WindowData<JoinAndSplitData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
}
......@@ -63,7 +63,7 @@ public class RecordPersistenceTestCase {
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<MergeData> windowData = (WindowData<MergeData>)testAField.get(persistenceData);
WindowData<JoinAndSplitData> windowData = (WindowData<JoinAndSplitData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
persistenceData.release();
......
......@@ -16,7 +16,7 @@ public class RecordWindowDataTestCase {
RecordData recordData = new RecordData(id_1);
Assert.assertEquals(id_1, recordData.getId());
Assert.assertEquals("B" + Const.ID_SPLIT + "C", recordData.getRecord().get("aggId").getAsString());
Assert.assertEquals("B" + Const.ID_SPLIT + "C", recordData.get().get("aggId").getAsString());
}
@Test
......@@ -26,10 +26,10 @@ public class RecordWindowDataTestCase {
JsonObject record = new JsonObject();
record.addProperty("Column", "VALUE");
recordData.setRecord(record);
recordData.set(record);
Assert.assertEquals(id_1, recordData.getId());
Assert.assertEquals("B" + Const.ID_SPLIT + "C", recordData.getRecord().get("aggId").getAsString());
Assert.assertEquals("VALUE", recordData.getRecord().get("Column").getAsString());
Assert.assertEquals("B" + Const.ID_SPLIT + "C", recordData.get().get("aggId").getAsString());
Assert.assertEquals("VALUE", recordData.get().get("Column").getAsString());
}
}
......@@ -3,7 +3,7 @@ package com.a.eye.skywalking.collector.worker.tools;
import com.a.eye.skywalking.collector.actor.AbstractClusterWorker;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.mock.MergeDataAnswer;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.JoinAndSplitData;
import org.junit.Assert;
/**
......@@ -14,9 +14,9 @@ public enum MergeDataAggTools {
public void testOnWork(AbstractClusterWorker agg, MergeDataAnswer mergeDataAnswer) throws Exception {
String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
MergeData mergeData = new MergeData(id);
mergeData.setMergeData("Column", "VALUE");
agg.allocateJob(mergeData);
Assert.assertEquals("VALUE", mergeDataAnswer.getMergeDataList().get(0).asMap().get("Column"));
JoinAndSplitData joinAndSplitData = new JoinAndSplitData(id);
joinAndSplitData.set("Column", "VALUE");
agg.allocateJob(joinAndSplitData);
Assert.assertEquals("VALUE", mergeDataAnswer.getJoinAndSplitDataList().get(0).asMap().get("Column"));
}
}
......@@ -17,6 +17,6 @@ public enum RecordDataAggTools {
RecordData recordData = new RecordData(id);
agg.allocateJob(recordData);
RecordData result = RecordDataTool.INSTANCE.getRecord(recordDataAnswer.getRecordDataList(), id);
Assert.assertEquals("A" + Const.ID_SPLIT + "B", result.getRecord().get("aggId").getAsString());
Assert.assertEquals("A" + Const.ID_SPLIT + "B", result.get().get("aggId").getAsString());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册