diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractASyncMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractASyncMemberProvider.java deleted file mode 100644 index 23207cf880cdf46aedf3460a8a2676b598a62829..0000000000000000000000000000000000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractASyncMemberProvider.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.a.eye.skywalking.collector.actor; - -import akka.actor.ActorRef; -import com.a.eye.skywalking.collector.queue.DaemonThreadFactory; -import com.a.eye.skywalking.collector.queue.MessageHolder; -import com.lmax.disruptor.EventFactory; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; - -import java.lang.reflect.Constructor; - -/** - * @author pengys5 - */ -public abstract class AbstractASyncMemberProvider { - - private RingBuffer ringBuffer; - - public abstract Class memberClass(); - - public T createWorker(EventFactory eventFactory, ActorRef actorRef) throws Exception { - if (memberClass() == null) { - throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()"); - } - - Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class}); - memberConstructor.setAccessible(true); - T member = (T) memberConstructor.newInstance(actorRef); - - // Specify the size of the ring buffer, must be power of 2. - int bufferSize = 1024; - // Construct the Disruptor - Disruptor disruptor = new Disruptor(eventFactory, bufferSize, DaemonThreadFactory.INSTANCE); - // Connect the handler - disruptor.handleEventsWith(member); - // Start the Disruptor, starts all threads running - disruptor.start(); - // Get the ring buffer from the Disruptor to be used for publishing. - ringBuffer = disruptor.getRingBuffer(); - return member; - } - - public void onData(MessageHolder message) { - long sequence = ringBuffer.next(); - try { - ringBuffer.get(sequence).setMessage(message); - } finally { - ringBuffer.publish(sequence); - } - } - - /** - * Use {@link #memberClass()} method returned class's simple name as a role name. - * - * @return is role of Worker - */ - protected String roleName() { - return memberClass().getSimpleName(); - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMember.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMember.java new file mode 100644 index 0000000000000000000000000000000000000000..bb957c54ee25803718502df36cc4abd031325c47 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMember.java @@ -0,0 +1,33 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.queue.MessageHolder; +import com.lmax.disruptor.RingBuffer; + +/** + * @author pengys5 + */ +public abstract class AbstractAsyncMember extends AbstractMember { + + private RingBuffer ringBuffer; + + public AbstractAsyncMember(RingBuffer ringBuffer, ActorRef actorRef) { + super(actorRef); + this.ringBuffer = ringBuffer; + } + + public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { + Object message = event.getMessage(); + event.reset(); + receive(message); + } + + public void beTold(Object message) throws Exception { + long sequence = ringBuffer.next(); + try { + ringBuffer.get(sequence).setMessage(message); + } finally { + ringBuffer.publish(sequence); + } + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMemberProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..452cedfcb721ab81100dd4b329d7d53e30903f0a --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMemberProvider.java @@ -0,0 +1,41 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.queue.DaemonThreadFactory; +import com.a.eye.skywalking.collector.queue.MessageHolder; +import com.a.eye.skywalking.collector.queue.MessageHolderFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; + +import java.lang.reflect.Constructor; + +/** + * @author pengys5 + */ +public abstract class AbstractAsyncMemberProvider extends AbstractMemberProvider { + + public abstract int queueSize(); + + @Override + public T createWorker(ActorRef actorRef) throws Exception { + if (memberClass() == null) { + throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()"); + } + + // Specify the size of the ring buffer, must be power of 2. + int bufferSize = queueSize(); + // Construct the Disruptor + Disruptor disruptor = new Disruptor(MessageHolderFactory.INSTANCE, bufferSize, DaemonThreadFactory.INSTANCE); + // Start the Disruptor, starts all threads running + RingBuffer ringBuffer = disruptor.start(); + + Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{RingBuffer.class, ActorRef.class}); + memberConstructor.setAccessible(true); + T member = (T) memberConstructor.newInstance(ringBuffer, actorRef); + + // Connect the handler + disruptor.handleEventsWith(member); + return member; + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java index ae9d5d6bb2571e9f523ddb24d9d4065d2e4872ac..eb39ecf5deca57f6e58f333130cc7da708af277a 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java @@ -13,13 +13,13 @@ import java.util.List; /** * @author pengys5 */ -public abstract class AbstractMember implements EventHandler> { +public abstract class AbstractMember implements EventHandler { private Logger logger = LogManager.getFormatterLogger(AbstractMember.class); private ActorRef actorRef; - public ActorRef getSelf() { + private ActorRef getSelf() { return actorRef; } @@ -27,6 +27,8 @@ public abstract class AbstractMember implements EventHandler this.actorRef = actorRef; } + public abstract void beTold(Object message) throws Exception; + /** * Receive the message to analyse. * @@ -35,12 +37,6 @@ public abstract class AbstractMember implements EventHandler */ public abstract void receive(Object message) throws Exception; - public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { - T message = event.getMessage(); - event.reset(); - receive(message); - } - /** * Send analysed data to next Worker. * @@ -49,7 +45,7 @@ public abstract class AbstractMember implements EventHandler * @param message is the data used to send to next worker. * @throws Exception */ - public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Exception { + public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Exception { logger.debug("worker provider: %s ,role name: %s", targetWorkerProvider.getClass().getName(), targetWorkerProvider.roleName()); List availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName()); selector.select(availableWorks, message).tell(message, getSelf()); diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java new file mode 100644 index 0000000000000000000000000000000000000000..a78824a8074e5c3b6b195293a1f441207192d706 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java @@ -0,0 +1,13 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; + +/** + * @author pengys5 + */ +public abstract class AbstractMemberProvider { + + public abstract Class memberClass(); + + public abstract T createWorker(ActorRef actorRef) throws Exception; +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMember.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMember.java new file mode 100644 index 0000000000000000000000000000000000000000..68adfd16326f9dc9c52d2e26a35c9cc6ded59f00 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMember.java @@ -0,0 +1,23 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.queue.MessageHolder; + +/** + * @author pengys5 + */ +public abstract class AbstractSyncMember extends AbstractMember { + + public AbstractSyncMember(ActorRef actorRef) { + super(actorRef); + } + + @Override + public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { + } + + @Override + public void beTold(Object message) throws Exception { + receive(message); + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java index a1016e4f4cf29c0bd652945518737d13f4702b9c..fb83086a74f04f73338d36281e45e53f6b33781b 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java @@ -7,27 +7,17 @@ import java.lang.reflect.Constructor; /** * @author pengys5 */ -public abstract class AbstractSyncMemberProvider { - - public abstract Class memberClass(); +public abstract class AbstractSyncMemberProvider extends AbstractMemberProvider { + @Override public T createWorker(ActorRef actorRef) throws Exception { if (memberClass() == null) { throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()"); } - Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class}); + Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class}); memberConstructor.setAccessible(true); T member = (T) memberConstructor.newInstance(actorRef); return member; } - - /** - * Use {@link #memberClass()} method returned class's simple name as a role name. - * - * @return is role of Worker - */ - protected String roleName() { - return memberClass().getSimpleName(); - } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java index d66f88473a4343620b2f417858f7e6f537a80477..670d0a1ba8eb24e15a678a1b5dc9107e96218e1d 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java @@ -38,7 +38,7 @@ import java.util.List; * } * }}} */ -public abstract class AbstractWorker extends UntypedActor { +public abstract class AbstractWorker extends UntypedActor { private Logger logger = LogManager.getFormatterLogger(AbstractWorker.class); @@ -96,7 +96,7 @@ public abstract class AbstractWorker extends UntypedActor { * @param message is the data used to send to next worker. * @throws Throwable */ - public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Throwable { + public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Throwable { List availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName()); selector.select(availableWorks, message).tell(message, getSelf()); } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java index 3582d868c05a90c9b60b6fd4d1c64db2f4be32ec..3e6d20f84420001e3ad8afffde840ec066255271 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java @@ -13,7 +13,7 @@ import java.util.List; * * @author wusheng */ -public interface WorkerSelector { +public interface WorkerSelector { /** * select a {@link WorkerRef} from a {@link WorkerRef} list. * @@ -21,5 +21,5 @@ public interface WorkerSelector { * @param message the {@link AbstractWorker} is going to send. * @return the selected {@link WorkerRef} */ - WorkerRef select(List members, T message); + WorkerRef select(List members, Object message); } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java index 779c2346b49df85270c0a3808e8834372ca2ce7f..fb83f5e6639b1969a2e01cc3686b2a38705cb3cb 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java @@ -3,14 +3,14 @@ package com.a.eye.skywalking.collector.queue; /** * @author pengys5 */ -public class MessageHolder { - private T message; +public class MessageHolder { + private Object message; - public T getMessage() { + public Object getMessage() { return message; } - public void setMessage(T message) { + public void setMessage(Object message) { this.message = message; } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolderFactory.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolderFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..972843e46835e0fa6eebc661d5a68bfb9c2d4ac2 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolderFactory.java @@ -0,0 +1,15 @@ +package com.a.eye.skywalking.collector.queue; + +import com.lmax.disruptor.EventFactory; + +/** + * @author pengys5 + */ +public class MessageHolderFactory implements EventFactory { + + public static MessageHolderFactory INSTANCE = new MessageHolderFactory(); + + public MessageHolder newInstance() { + return new MessageHolder(); + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java index bad069c9904e86ed3578a5cf8e352d950bf5639f..76f5a3f4b4feede3c7a14225467d473ce783fc07 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java @@ -1,9 +1,12 @@ package com.a.eye.skywalking.collector.worker; import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.actor.AbstractAsyncMember; import com.a.eye.skywalking.collector.actor.AbstractMember; +import com.a.eye.skywalking.collector.queue.MessageHolder; import com.a.eye.skywalking.collector.worker.storage.EsClient; import com.google.gson.JsonObject; +import com.lmax.disruptor.RingBuffer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -16,7 +19,7 @@ import java.util.Map; /** * @author pengys5 */ -public abstract class PersistenceMember extends AbstractMember { +public abstract class PersistenceMember extends AbstractAsyncMember { private Logger logger = LogManager.getFormatterLogger(PersistenceMember.class); @@ -24,8 +27,8 @@ public abstract class PersistenceMember extends AbstractMember { private Map persistenceData = new HashMap(); - public PersistenceMember(ActorRef actorRef) { - super(actorRef); + public PersistenceMember(RingBuffer ringBuffer, ActorRef actorRef) { + super(ringBuffer, actorRef); } public abstract String esIndex(); @@ -54,7 +57,6 @@ public abstract class PersistenceMember extends AbstractMember { if (message instanceof PersistenceCommand) { persistence(false); } else { - logger.debug("receive message"); analyse(message); } } @@ -62,25 +64,11 @@ public abstract class PersistenceMember extends AbstractMember { private void persistence(boolean dataFull) { long now = System.currentTimeMillis(); if (now - lastPersistenceTimestamp > 5000 || dataFull) { - boolean success = saveToEs(); + boolean success = EsClient.saveToEs(esIndex(), esType(), persistenceData); if (success) { persistenceData.clear(); lastPersistenceTimestamp = now; } } } - - private boolean saveToEs() { - TransportClient client = EsClient.client(); - BulkRequestBuilder bulkRequest = client.prepareBulk(); - - for (Map.Entry entry : persistenceData.entrySet()) { - String id = entry.getKey(); - JsonObject data = entry.getValue(); - bulkRequest.add(client.prepareIndex(esIndex(), esType(), id).setSource(data.toString())); - } - - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - return !bulkResponse.hasFailures(); - } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java index 78f0ee48c9d650f2c19e8bb5086625dba71e1085..8380b96397f5a363e7426ae92e18f023d18a4c26 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java @@ -5,9 +5,6 @@ import com.a.eye.skywalking.collector.worker.storage.EsClient; import com.google.gson.JsonObject; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.transport.TransportClient; import java.util.HashMap; import java.util.Map; @@ -15,7 +12,7 @@ import java.util.Map; /** * @author pengys5 */ -public abstract class PersistenceWorker extends AbstractWorker { +public abstract class PersistenceWorker extends AbstractWorker { private Logger logger = LogManager.getFormatterLogger(PersistenceWorker.class); @@ -57,25 +54,11 @@ public abstract class PersistenceWorker extends AbstractWorker { private void persistence(boolean dataFull) { long now = System.currentTimeMillis(); if (now - lastPersistenceTimestamp > 5000 || dataFull) { - boolean success = saveToEs(); + boolean success = EsClient.saveToEs(esIndex(), esType(), persistenceData); if (success) { persistenceData.clear(); lastPersistenceTimestamp = now; } } } - - private boolean saveToEs() { - TransportClient client = EsClient.client(); - BulkRequestBuilder bulkRequest = client.prepareBulk(); - - for (Map.Entry entry : persistenceData.entrySet()) { - String id = entry.getKey(); - JsonObject data = entry.getValue(); - bulkRequest.add(client.prepareIndex(esIndex(), esType(), id).setSource(data.toString())); - } - - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - return !bulkResponse.hasFailures(); - } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java index 82f6f634acf7a2ed8be16bf236daf06fb4d26b89..c23cc6a87d836677f7e7956dea4296814fc57a36 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java @@ -37,4 +37,9 @@ public class WorkerConfig extends ClusterConfig { } } + public static class Queue { + public static class TraceSegmentRecordMember { + public static int Size = 32; + } + } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java index 7f32b11a687c114c03ca9b429f5d6af2991c6c19..32fc7e4f7055532b1a32e7bae12217cc543082e4 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java @@ -1,7 +1,7 @@ package com.a.eye.skywalking.collector.worker.application; import akka.actor.ActorRef; -import com.a.eye.skywalking.collector.actor.AbstractMember; +import com.a.eye.skywalking.collector.actor.AbstractSyncMember; import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider; import com.a.eye.skywalking.collector.actor.selector.RollingSelector; import com.a.eye.skywalking.collector.worker.application.metric.TraceSegmentRecordMember; @@ -19,14 +19,15 @@ import org.apache.logging.log4j.Logger; /** * @author pengys5 */ -public class ApplicationMember extends AbstractMember { +public class ApplicationMember extends AbstractSyncMember { private Logger logger = LogManager.getFormatterLogger(ApplicationMember.class); - private TraceSegmentRecordMember traceSegmentRecordMember = TraceSegmentRecordMember.Factory.INSTANCE.createWorker(TraceSegmentRecordMember.MessageFactory.INSTANCE, getSelf()); + private TraceSegmentRecordMember recordMember; public ApplicationMember(ActorRef actorRef) throws Exception { super(actorRef); + recordMember = TraceSegmentRecordMember.Factory.INSTANCE.createWorker(actorRef); } @Override @@ -34,7 +35,7 @@ public class ApplicationMember extends AbstractMember { if (message instanceof TraceSegment) { logger.debug("begin translate TraceSegment Object to JsonObject"); TraceSegment traceSegment = (TraceSegment) message; - traceSegmentRecordMember.receive(traceSegment); + recordMember.beTold(traceSegment); sendToDAGNodePersistence(traceSegment); sendToNodeInstancePersistence(traceSegment); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java index a8e5bbe8795b517b890f9701a284c565d86320c6..f6bb64567351b03178e0544a96fbec427f2929f7 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java @@ -2,18 +2,18 @@ package com.a.eye.skywalking.collector.worker.application.metric; import akka.actor.ActorRef; -import com.a.eye.skywalking.collector.actor.AbstractASyncMemberProvider; -import com.a.eye.skywalking.collector.actor.AbstractMember; +import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider; import com.a.eye.skywalking.collector.actor.selector.RollingSelector; import com.a.eye.skywalking.collector.queue.MessageHolder; import com.a.eye.skywalking.collector.worker.PersistenceMember; +import com.a.eye.skywalking.collector.worker.WorkerConfig; import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence; import com.a.eye.skywalking.trace.Span; import com.a.eye.skywalking.trace.TraceSegment; import com.a.eye.skywalking.trace.TraceSegmentRef; import com.google.gson.JsonArray; import com.google.gson.JsonObject; -import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.RingBuffer; import java.util.List; import java.util.Map; @@ -33,8 +33,8 @@ public class TraceSegmentRecordMember extends PersistenceMember { return "trace_segment"; } - public TraceSegmentRecordMember(ActorRef actorRef) throws Throwable { - super(actorRef); + public TraceSegmentRecordMember(RingBuffer ringBuffer, ActorRef actorRef) { + super(ringBuffer, actorRef); } @Override @@ -47,16 +47,13 @@ public class TraceSegmentRecordMember extends PersistenceMember { } } - public static class MessageFactory implements EventFactory { - public static MessageFactory INSTANCE = new MessageFactory(); + public static class Factory extends AbstractAsyncMemberProvider { + public static Factory INSTANCE = new Factory(); - public MessageHolder newInstance() { - return new MessageHolder(); + @Override + public int queueSize() { + return WorkerConfig.Queue.TraceSegmentRecordMember.Size; } - } - - public static class Factory extends AbstractASyncMemberProvider { - public static Factory INSTANCE = new Factory(); @Override public Class memberClass() { diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java index 1e4004c6789c1a6677b43de0bd2286b66a19ce8b..835e460eccd569c4e0405bf2dd015aab6924a96b 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java @@ -12,7 +12,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class DAGNodePersistence extends PersistenceWorker { +public class DAGNodePersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(DAGNodePersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java index 0edb3c4be266d137c2d48619710bc264b1c081c9..40051efcda205110ec58f72bcf66d40c78056c01 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java @@ -10,7 +10,7 @@ import org.apache.logging.log4j.Logger; /** * @author pengys5 */ -public class NodeInstancePersistence extends PersistenceWorker { +public class NodeInstancePersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(NodeInstancePersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java index af7926dfb08fad29e42a94aa7e4329df940a1f94..3a80b06a98e26a3e031959b0e3555115e93fba00 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java @@ -12,7 +12,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class ResponseCostPersistence extends PersistenceWorker { +public class ResponseCostPersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(ResponseCostPersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java index 164e73ec25d255a043deae872df3f38cfaa2a867..a5dc5bb5093dcde35c455fa549539dea0f66fe7d 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java @@ -12,7 +12,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class ResponseSummaryPersistence extends PersistenceWorker { +public class ResponseSummaryPersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(ResponseSummaryPersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java index afff42b3052b1f659c90e865f5dc399eb0cb3aa1..546150c8f595bd191875bb8ddf3b7d4bf59f5d4a 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java @@ -10,7 +10,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class DAGNodeRefPersistence extends PersistenceWorker { +public class DAGNodeRefPersistence extends PersistenceWorker { @Override public String esIndex() { diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java index 7950925218efa2ca8f3b539570a1c60c004d77aa..7e3517b148ed697c9c8b33b6d666e0e5b66ce521 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java @@ -16,11 +16,14 @@ public class TraceSegmentReceiver extends AbstractWorker { private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class); - private ApplicationMember applicationMember = ApplicationMember.Factory.INSTANCE.createWorker(getSelf()); + private ApplicationMember applicationMember; + + private ApplicationRefMember applicationRefMember; - private ApplicationRefMember applicationRefMember = ApplicationRefMember.Factory.INSTANCE.createWorker(getSelf()); public TraceSegmentReceiver() throws Exception { + applicationMember = ApplicationMember.Factory.INSTANCE.createWorker(getSelf()); + applicationRefMember = ApplicationRefMember.Factory.INSTANCE.createWorker(getSelf()); } @Override @@ -29,8 +32,8 @@ public class TraceSegmentReceiver extends AbstractWorker { TraceSegment traceSegment = (TraceSegment) message; logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId()); - applicationMember.receive(traceSegment); - applicationRefMember.receive(traceSegment); + applicationMember.beTold(traceSegment); + applicationRefMember.beTold(traceSegment); } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java index 5f84d4ded24999a1b5259ec6bb1e1b10909b1e18..3a8f6f098602b01646e4a48fd061d6507909dfbc 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java @@ -1,5 +1,8 @@ package com.a.eye.skywalking.collector.worker.storage; +import com.google.gson.JsonObject; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -7,6 +10,7 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Map; /** * @author pengys5 @@ -24,7 +28,16 @@ public class EsClient { .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)); } - public static TransportClient client() { - return client; + public static boolean saveToEs(String esIndex, String esType, Map persistenceData) { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + + for (Map.Entry entry : persistenceData.entrySet()) { + String id = entry.getKey(); + JsonObject data = entry.getValue(); + bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(data.toString())); + } + + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + return !bulkResponse.hasFailures(); } }