diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergeAnalysisMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergeAnalysisMember.java deleted file mode 100644 index 395759e87b4a09788073c843127e88d2407905ae..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergeAnalysisMember.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.a.eye.skywalking.collector.worker; - -import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; -import com.a.eye.skywalking.collector.actor.LocalWorkerContext; -import com.a.eye.skywalking.collector.actor.Role; -import com.a.eye.skywalking.collector.actor.WorkerRefs; -import com.a.eye.skywalking.collector.worker.storage.MergeAnalysisData; - -/** - * @author pengys5 - */ -public abstract class MergeAnalysisMember extends AnalysisMember { - private MergeAnalysisData mergeAnalysisData; - - public MergeAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); - mergeAnalysisData = new MergeAnalysisData(); - } - - private MergeAnalysisData getMergeAnalysisData() { - return mergeAnalysisData; - } - - final protected void setMergeData(String id, String column, String value) throws Exception { - getMergeAnalysisData().getOrCreate(id).setMergeData(column, value); - } - - @Override - final protected void aggregation() throws Exception { - getMergeAnalysisData().asMap().forEach((key, value) -> { - try { - aggWorkRefs().tell(value); - } catch (Exception e) { - e.printStackTrace(); - } - }); - getMergeAnalysisData().asMap().clear(); - } - - protected abstract WorkerRefs aggWorkRefs(); -} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergePersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergePersistenceMember.java deleted file mode 100644 index 2e26fb3255c8077c8f9431a422d5d927576e1fc5..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/MergePersistenceMember.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.a.eye.skywalking.collector.worker; - -import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; -import com.a.eye.skywalking.collector.actor.LocalWorkerContext; -import com.a.eye.skywalking.collector.actor.Role; -import com.a.eye.skywalking.collector.worker.storage.EsClient; -import com.a.eye.skywalking.collector.worker.storage.MergeData; -import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.client.Client; - -import java.util.List; -import java.util.Map; - -/** - * @author pengys5 - */ -public abstract class MergePersistenceMember extends PersistenceMember { - - private Logger logger = LogManager.getFormatterLogger(MergePersistenceMember.class); - - protected MergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); - } - - @Override - public MergePersistenceData initializeData() { - return new MergePersistenceData(); - } - - @Override - final public void analyse(Object message) throws Exception { - if (message instanceof MergeData) { - MergeData mergeData = (MergeData) message; - MergePersistenceData data = getPersistenceData(); - data.hold(); - data.getOrCreate(mergeData.getId()).merge(mergeData); - data.release(); - } else { - logger.error("unhandled message, message instance must MergeData, but is %s", message.getClass().toString()); - } - } - - @Override - final protected void prepareIndex(List builderList) { - Map lastData = getPersistenceData().getLast().asMap(); - extractData(lastData); - - Client client = EsClient.INSTANCE.getClient(); - lastData.forEach((key, value) -> { - IndexRequestBuilder builder = client.prepareIndex(esIndex(), esType(), key).setSource(value.asMap()); - builderList.add(builder); - }); - lastData.clear(); - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergeAnalysisData.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergeAnalysisData.java deleted file mode 100644 index 61d70221b765a8f7d1a33e9a8534ea22e2508563..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergeAnalysisData.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.a.eye.skywalking.collector.worker.storage; - -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * @author pengys5 - */ -public class MergeAnalysisData { - - private WindowData windowData = new WindowData(new LinkedHashMap()); - - public MergeData getOrCreate(String id) { - if (!windowData.containsKey(id)) { - windowData.put(id, new MergeData(id)); - } - return windowData.get(id); - } - - public Map asMap() { - return windowData.asMap(); - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergeData.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergeData.java deleted file mode 100644 index 8c179f629dd37d7ce60a947a92a816b4a0b7a25a..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergeData.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.a.eye.skywalking.collector.worker.storage; - -import com.a.eye.skywalking.collector.actor.selector.AbstractHashMessage; - -import java.util.*; - -/** - * @author pengys5 - */ -public class MergeData extends AbstractHashMessage implements Data { - - public static final String SPLIT = ","; - - private String id; - - private Map> mergeData = new HashMap<>(); - - public MergeData(String key) { - super(key); - this.id = key; - } - - public String getId() { - return id; - } - - public void setMergeData(String column, String value) { - if (!mergeData.containsKey(column)) { - mergeData.put(column, new HashSet<>()); - } - mergeData.get(column).add(value); - } - - public void merge(MergeData data) { - data.mergeData.forEach((column, valueSet) -> valueSet.forEach(value -> setMergeData(column, value))); - } - - public void merge(Map dbData) { - dbData.forEach((column, dbValue) -> { - if (!AbstractIndex.TIME_SLICE.equals(column) && !AbstractIndex.AGG_COLUMN.equals(column)) { - String[] dbValues = String.valueOf(dbValue).split(SPLIT); - for (String value : dbValues) { - setMergeData(column, value); - } - } - }); - } - - public Map asMap() { - Map source = new HashMap<>(); - mergeData.forEach((column, valueSet) -> { - Iterator iterator = valueSet.iterator(); - StringBuilder builder = new StringBuilder(); - - int i = 0; - - while (iterator.hasNext()) { - if (i == 0) { - builder.append(iterator.next()); - } else { - builder.append(SPLIT).append(iterator.next()); - } - i++; - } - source.put(column, builder.toString()); - }); - - return source; - } -} \ No newline at end of file diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceData.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceData.java deleted file mode 100644 index bf708ae6d5e6070a5fb7b12548e86dcae69ceb9c..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceData.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.a.eye.skywalking.collector.worker.storage; - -/** - * @author pengys5 - */ -public class MergePersistenceData extends Window implements PersistenceData { - - private WindowData lockedWindowData; - - public MergeData getOrCreate(String id) { - if (!lockedWindowData.containsKey(id)) { - lockedWindowData.put(id, new MergeData(id)); - } - return lockedWindowData.get(id); - } - - public void hold() { - lockedWindowData = getCurrentAndHold(); - } - - public void release() { - lockedWindowData.release(); - lockedWindowData = null; - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergePersistenceMemberTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergePersistenceMemberTestCase.java deleted file mode 100644 index ecf119012506fc47a948a651d4f1c62d2ed2158e..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/MergePersistenceMemberTestCase.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.a.eye.skywalking.collector.worker; - -import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; -import com.a.eye.skywalking.collector.actor.LocalWorkerContext; -import com.a.eye.skywalking.collector.worker.mock.MockEsBulkClient; -import com.a.eye.skywalking.collector.worker.storage.EsClient; -import com.a.eye.skywalking.collector.worker.storage.MergeData; -import com.a.eye.skywalking.collector.worker.storage.MergePersistenceData; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import static org.powermock.api.mockito.PowerMockito.*; - -/** - * @author pengys5 - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({TestMergePersistenceMember.class, EsClient.class}) -@PowerMockIgnore({"javax.management.*"}) -public class MergePersistenceMemberTestCase { - - private TestMergePersistenceMember mergePersistenceMember; - private MergePersistenceData persistenceData; - - @Before - public void init() throws Exception { - MockEsBulkClient mockEsBulkClient = new MockEsBulkClient(); - mockEsBulkClient.createMock(); - - ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext(null); - LocalWorkerContext localWorkerContext = new LocalWorkerContext(); - mergePersistenceMember = PowerMockito.spy(new TestMergePersistenceMember(TestMergePersistenceMember.Role.INSTANCE, clusterWorkerContext, localWorkerContext)); - - persistenceData = mock(MergePersistenceData.class); - MergeData mergeData = mock(MergeData.class); - - when(mergePersistenceMember, "getPersistenceData").thenReturn(persistenceData); - when(persistenceData.getOrCreate(Mockito.anyString())).thenReturn(mergeData); - - doCallRealMethod().when(mergePersistenceMember).analyse(Mockito.any(MergeData.class)); - } - - @Test - public void testAnalyse() throws Exception { - String id = "2016" + Const.ID_SPLIT + "A" + Const.ID_SPLIT + "B"; - MergeData mergeData = new MergeData(id); - mergeData.setMergeData("Column", "VALUE"); - - mergePersistenceMember.analyse(mergeData); - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergeAnalysisMember.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergeAnalysisMember.java deleted file mode 100644 index 96cc8fd6adcb3d52190b4c3a3a9894c19a44f25c..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergeAnalysisMember.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.a.eye.skywalking.collector.worker; - -import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; -import com.a.eye.skywalking.collector.actor.LocalWorkerContext; -import com.a.eye.skywalking.collector.actor.ProviderNotFoundException; -import com.a.eye.skywalking.collector.actor.WorkerRefs; -import com.a.eye.skywalking.collector.actor.selector.WorkerSelector; - -/** - * @author pengys5 - */ -public class TestMergeAnalysisMember extends MergeAnalysisMember { - - TestMergeAnalysisMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); - } - - @Override - public void analyse(Object message) throws Exception { - - } - - @Override - public void preStart() throws ProviderNotFoundException { - super.preStart(); - } - - @Override - protected WorkerRefs aggWorkRefs() { - return null; - } - - public enum Role implements com.a.eye.skywalking.collector.actor.Role { - INSTANCE; - - @Override - public String roleName() { - return null; - } - - @Override - public WorkerSelector workerSelector() { - return null; - } - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergePersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergePersistenceMember.java deleted file mode 100644 index 0f81a9043e1973d74d169abb8fbb5937cd0178cd..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/TestMergePersistenceMember.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.a.eye.skywalking.collector.worker; - -import com.a.eye.skywalking.collector.actor.ClusterWorkerContext; -import com.a.eye.skywalking.collector.actor.LocalWorkerContext; -import com.a.eye.skywalking.collector.actor.selector.WorkerSelector; - -/** - * @author pengys5 - */ -public class TestMergePersistenceMember extends MergePersistenceMember { - TestMergePersistenceMember(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { - super(role, clusterContext, selfContext); - } - - @Override - public String esIndex() { - return null; - } - - @Override - public String esType() { - return null; - } - - public enum Role implements com.a.eye.skywalking.collector.actor.Role { - INSTANCE; - - @Override - public String roleName() { - return null; - } - - @Override - public WorkerSelector workerSelector() { - return null; - } - } -} diff --git a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceDataTestCase.java b/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceDataTestCase.java deleted file mode 100644 index 4d64703f3f4eab884b04bd8e6e6d918aa8abceb8..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-worker/src/test/java/com/a/eye/skywalking/collector/worker/storage/MergePersistenceDataTestCase.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.a.eye.skywalking.collector.worker.storage; - -import java.lang.reflect.Field; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * @author pengys5 - */ -public class MergePersistenceDataTestCase { - - @Test - public void testGetElseCreate() { - MergePersistenceData persistenceData = new MergePersistenceData(); - persistenceData.hold(); - MergeData mergeData = persistenceData.getOrCreate("test"); - Assert.assertEquals("test", mergeData.getId()); - } - - @Test - public void testSize() { - MergePersistenceData persistenceData = new MergePersistenceData(); - persistenceData.hold(); - persistenceData.getOrCreate("test_1"); - Assert.assertEquals(1, persistenceData.getCurrentAndHold().size()); - persistenceData.getOrCreate("test_1"); - Assert.assertEquals(1, persistenceData.getCurrentAndHold().size()); - persistenceData.getOrCreate("test_2"); - Assert.assertEquals(2, persistenceData.getCurrentAndHold().size()); - } - - @Test - public void testClear() { - MergePersistenceData persistenceData = new MergePersistenceData(); - persistenceData.hold(); - persistenceData.getOrCreate("test_1"); - Assert.assertEquals(1, persistenceData.getCurrentAndHold().size()); - persistenceData.getCurrentAndHold().clear(); - Assert.assertEquals(0, persistenceData.getCurrentAndHold().size()); - } - - @Test - public void hold() throws NoSuchFieldException, IllegalAccessException { - MergePersistenceData persistenceData = new MergePersistenceData(); - persistenceData.hold(); - - Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData"); - testAField.setAccessible(true); - WindowData windowData = (WindowData)testAField.get(persistenceData); - Assert.assertEquals(true, windowData.isHolding()); - } - - @Test - public void release() throws NoSuchFieldException, IllegalAccessException { - MergePersistenceData persistenceData = new MergePersistenceData(); - persistenceData.hold(); - - Field testAField = persistenceData.getClass().getDeclaredField("lockedWindowData"); - testAField.setAccessible(true); - WindowData windowData = (WindowData)testAField.get(persistenceData); - Assert.assertEquals(true, windowData.isHolding()); - - persistenceData.release(); - Assert.assertEquals(false, windowData.isHolding()); - } -}