提交 c43d375d 编写于 作者: P pengys5

no message

上级 e77abe23
......@@ -9,23 +9,22 @@ import com.a.eye.skywalking.collector.worker.PersistenceMember;
import com.a.eye.skywalking.collector.worker.config.CacheSizeConfig;
import com.a.eye.skywalking.collector.worker.segment.SegmentIndex;
import com.a.eye.skywalking.collector.worker.segment.entity.Segment;
import com.a.eye.skywalking.collector.worker.storage.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import com.a.eye.skywalking.collector.worker.storage.AbstractIndex;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.PersistenceWorkerListener;
import com.a.eye.skywalking.collector.worker.storage.SegmentData;
import com.a.eye.skywalking.collector.worker.storage.SegmentPersistenceData;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
/**
* @author pengys5
*/
public class SegmentSave extends PersistenceMember<SegmentPersistenceData, SegmentData> {
private Logger logger = LogManager.getFormatterLogger(SegmentSave.class);
@Override
public String esIndex() {
return SegmentIndex.INDEX;
......@@ -46,22 +45,19 @@ public class SegmentSave extends PersistenceMember<SegmentPersistenceData, Segme
return new SegmentPersistenceData();
}
int i = 0;
@Override
final public void analyse(Object message) throws Exception {
if (message instanceof Segment) {
Segment segment = (Segment) message;
SegmentPersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(segment.getTraceSegmentId() + i).setSegmentStr(segment.getJsonStr());
data.getOrCreate(segment.getTraceSegmentId()).setSegmentStr(segment.getJsonStr());
if (data.size() >= CacheSizeConfig.Cache.Persistence.SIZE) {
persistence(data.asMap());
}
data.release();
i++;
} else {
logger.error("unhandled message, message instance must Segment, but is %s", message.getClass().toString());
logger().error("unhandled message, message instance must Segment, but is %s", message.getClass().toString());
}
}
......
package com.a.eye.skywalking.collector.worker.noderef.analysis;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.RollingSelector;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
import com.a.eye.skywalking.collector.worker.Const;
import com.a.eye.skywalking.collector.worker.storage.MetricAnalysisData;
import java.lang.reflect.Field;
import org.junit.Assert;
import org.junit.Test;
/**
* @author pengys5
*/
public class AbstractNodeRefResSumAnalysisTestCase {
@Test
public void analyseResSum() throws Exception {
Impl impl = new Impl(Role.INSTANCE, null, null);
AbstractNodeRefResSumAnalysis.NodeRefResRecord record =
new AbstractNodeRefResSumAnalysis.NodeRefResRecord(1, 2, 3, 4);
record.setStartTime(10);
record.setEndTime(20);
record.setError(false);
String id = 2017 + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
record.setNodeRefId(id);
Assert.assertEquals(id, record.getNodeRefId());
impl.analyseResSum(record);
record.setStartTime(0);
record.setEndTime(2000);
record.setError(false);
impl.analyseResSum(record);
record.setStartTime(0);
record.setEndTime(4000);
record.setError(false);
impl.analyseResSum(record);
record.setStartTime(0);
record.setEndTime(6000);
record.setError(false);
impl.analyseResSum(record);
record.setStartTime(0);
record.setEndTime(6000);
record.setError(true);
impl.analyseResSum(record);
Field testAField = impl.getClass().getSuperclass().getSuperclass().getDeclaredField("metricAnalysisData");
testAField.setAccessible(true);
MetricAnalysisData metricAnalysisData = (MetricAnalysisData)testAField.get(impl);
Assert.assertEquals(1L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("oneSecondLess"));
Assert.assertEquals(1L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("threeSecondLess"));
Assert.assertEquals(1L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("fiveSecondLess"));
Assert.assertEquals(1L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("fiveSecondGreater"));
Assert.assertEquals(1L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("error"));
Assert.assertEquals(5L, metricAnalysisData.asMap().get("2017..-..A..-..B").asMap().get("summary"));
}
class Impl extends AbstractNodeRefResSumAnalysis {
Impl(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override public void analyse(Object message) throws Exception {
}
@Override protected WorkerRefs aggWorkRefs() {
return null;
}
}
enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return Impl.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册