提交 d4eed42a 编写于 作者: P pengys5

no message

上级 70fedb10
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.worker.storage.MergeAnalysisData;
/**
* @author pengys5
*/
public abstract class MergeAnalysisMember extends AnalysisMember {
private MergeAnalysisData mergeAnalysisData;
public MergeAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
mergeAnalysisData = new MergeAnalysisData();
}
private MergeAnalysisData getMergeAnalysisData() {
return mergeAnalysisData;
}
final protected void setMergeData(String id, String column, String value) throws Exception {
getMergeAnalysisData().getOrCreate(id).setMergeData(column, value);
}
@Override
final protected void aggregation() throws Exception {
getMergeAnalysisData().asMap().forEach((key, value) -> {
try {
aggWorkRefs().tell(value);
} catch (Exception e) {
e.printStackTrace();
}
});
getMergeAnalysisData().asMap().clear();
}
protected abstract WorkerRefs aggWorkRefs();
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.Role;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import java.util.List;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class MergePersistenceMember extends PersistenceMember<MergePersistenceData, MergeData> {
private Logger logger = LogManager.getFormatterLogger(MergePersistenceMember.class);
protected MergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public MergePersistenceData initializeData() {
return new MergePersistenceData();
}
@Override
final public void analyse(Object message) throws Exception {
if (message instanceof MergeData) {
MergeData mergeData = (MergeData) message;
MergePersistenceData data = getPersistenceData();
data.hold();
data.getOrCreate(mergeData.getId()).merge(mergeData);
data.release();
} else {
logger.error("unhandled message, message instance must MergeData, but is %s", message.getClass().toString());
}
}
@Override
final protected void prepareIndex(List<IndexRequestBuilder> builderList) {
Map<String, MergeData> lastData = getPersistenceData().getLast().asMap();
extractData(lastData);
Client client = EsClient.INSTANCE.getClient();
lastData.forEach((key, value) -> {
IndexRequestBuilder builder = client.prepareIndex(esIndex(), esType(), key).setSource(value.asMap());
builderList.add(builder);
});
lastData.clear();
}
}
package com.a.eye.skywalking.collector.worker.storage;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @author pengys5
*/
public class MergeAnalysisData {
private WindowData<MergeData> windowData = new WindowData(new LinkedHashMap<String, MergeData>());
public MergeData getOrCreate(String id) {
if (!windowData.containsKey(id)) {
windowData.put(id, new MergeData(id));
}
return windowData.get(id);
}
public Map<String, MergeData> asMap() {
return windowData.asMap();
}
}
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage;
import java.util.*;
/**
* @author pengys5
*/
public class MergeData extends AbstractHashMessage implements Data {
public static final String SPLIT = ",";
private String id;
private Map<String, Set<String>> mergeData = new HashMap<>();
public MergeData(String key) {
super(key);
this.id = key;
}
public String getId() {
return id;
}
public void setMergeData(String column, String value) {
if (!mergeData.containsKey(column)) {
mergeData.put(column, new HashSet<>());
}
mergeData.get(column).add(value);
}
public void merge(MergeData data) {
data.mergeData.forEach((column, valueSet) -> valueSet.forEach(value -> setMergeData(column, value)));
}
public void merge(Map<String, ?> dbData) {
dbData.forEach((column, dbValue) -> {
if (!AbstractIndex.TIME_SLICE.equals(column) && !AbstractIndex.AGG_COLUMN.equals(column)) {
String[] dbValues = String.valueOf(dbValue).split(SPLIT);
for (String value : dbValues) {
setMergeData(column, value);
}
}
});
}
public Map<String, String> asMap() {
Map<String, String> source = new HashMap<>();
mergeData.forEach((column, valueSet) -> {
Iterator<String> iterator = valueSet.iterator();
StringBuilder builder = new StringBuilder();
int i = 0;
while (iterator.hasNext()) {
if (i == 0) {
builder.append(iterator.next());
} else {
builder.append(SPLIT).append(iterator.next());
}
i++;
}
source.put(column, builder.toString());
});
return source;
}
}
\ No newline at end of file
package com.a.eye.skywalking.collector.worker.storage;
/**
* @author pengys5
*/
public class MergePersistenceData extends Window<MergeData> implements PersistenceData<MergeData> {
private WindowData<MergeData> lockedWindowData;
public MergeData getOrCreate(String id) {
if (!lockedWindowData.containsKey(id)) {
lockedWindowData.put(id, new MergeData(id));
}
return lockedWindowData.get(id);
}
public void hold() {
lockedWindowData = getCurrentAndHold();
}
public void release() {
lockedWindowData.release();
lockedWindowData = null;
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient;
import com.a.eye.skywalking.collector.worker.storage.EsClient;
import com.a.eye.skywalking.collector.worker.storage.MergeData;
import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import static org.powermock.api.mockito.PowerMockito.*;
/**
* @author pengys5
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({TestMergePersistenceMember.class, EsClient.class})
@PowerMockIgnore({"javax.management.*"})
public class MergePersistenceMemberTestCase {
private TestMergePersistenceMember mergePersistenceMember;
private MergePersistenceData persistenceData;
@Before
public void init() throws Exception {
MockEsBulkClient mockEsBulkClient = new MockEsBulkClient();
mockEsBulkClient.createMock();
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null);
LocalWorkerContext localWorkerContext = new LocalWorkerContext();
mergePersistenceMember = PowerMockito.spy(new TestMergePersistenceMember(TestMergePersistenceMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext));
persistenceData = mock(MergePersistenceData.class);
MergeData mergeData = mock(MergeData.class);
when(mergePersistenceMember, "getPersistenceData").thenReturn(persistenceData);
when(persistenceData.getOrCreate(Mockito.anyString())).thenReturn(mergeData);
doCallRealMethod().when(mergePersistenceMember).analyse(Mockito.any(MergeData.class));
}
@Test
public void testAnalyse() throws Exception {
String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B";
MergeData mergeData = new MergeData(id);
mergeData.setMergeData("Column", "VALUE");
mergePersistenceMember.analyse(mergeData);
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.ProviderNotFoundException;
import com.a.eye.skywalking.collector.actor.WorkerRefs;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestMergeAnalysisMember extends MergeAnalysisMember {
TestMergeAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public void analyse(Object message) throws Exception {
}
@Override
public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override
protected WorkerRefs aggWorkRefs() {
return null;
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return null;
}
@Override
public WorkerSelector workerSelector() {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker;
import com.a.eye.skywalking.collector.actor.ClusterWorkerContext;
import com.a.eye.skywalking.collector.actor.LocalWorkerContext;
import com.a.eye.skywalking.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public class TestMergePersistenceMember extends MergePersistenceMember {
TestMergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override
public String esIndex() {
return null;
}
@Override
public String esType() {
return null;
}
public enum Role implements com.a.eye.skywalking.collector.actor.Role {
INSTANCE;
@Override
public String roleName() {
return null;
}
@Override
public WorkerSelector workerSelector() {
return null;
}
}
}
package com.a.eye.skywalking.collector.worker.storage;
import java.lang.reflect.Field;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
/**
* @author pengys5
*/
public class MergePersistenceDataTestCase {
@Test
public void testGetElseCreate() {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.hold();
MergeData mergeData = persistenceData.getOrCreate("test");
Assert.assertEquals("test", mergeData.getId());
}
@Test
public void testSize() {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.hold();
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getOrCreate("test_2");
Assert.assertEquals(2, persistenceData.getCurrentAndHold().size());
}
@Test
public void testClear() {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.hold();
persistenceData.getOrCreate("test_1");
Assert.assertEquals(1, persistenceData.getCurrentAndHold().size());
persistenceData.getCurrentAndHold().clear();
Assert.assertEquals(0, persistenceData.getCurrentAndHold().size());
}
@Test
public void hold() throws NoSuchFieldException, IllegalAccessException {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.hold();
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<MergeData> windowData = (WindowData<MergeData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
}
@Test
public void release() throws NoSuchFieldException, IllegalAccessException {
MergePersistenceData persistenceData = new MergePersistenceData();
persistenceData.hold();
Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData");
testAField.setAccessible(true);
WindowData<MergeData> windowData = (WindowData<MergeData>)testAField.get(persistenceData);
Assert.assertEquals(true, windowData.isHolding());
persistenceData.release();
Assert.assertEquals(false, windowData.isHolding());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册