From c5b6b57f4237c9e78aadd6cc42bf4bfb3987e3b3 Mon Sep 17 00:00:00 2001 From: pengys5 <8082209@qq.com> Date: Tue, 7 Mar 2017 15:41:29 +0800 Subject: [PATCH] no message --- .../actor/AbstractASyncMemberProvider.java | 61 ------------------- .../collector/actor/AbstractAsyncMember.java | 33 ++++++++++ .../actor/AbstractAsyncMemberProvider.java | 41 +++++++++++++ .../collector/actor/AbstractMember.java | 14 ++--- .../actor/AbstractMemberProvider.java | 13 ++++ .../collector/actor/AbstractSyncMember.java | 23 +++++++ .../actor/AbstractSyncMemberProvider.java | 16 +---- .../collector/actor/AbstractWorker.java | 4 +- .../actor/selector/WorkerSelector.java | 4 +- .../collector/queue/MessageHolder.java | 8 +-- .../collector/queue/MessageHolderFactory.java | 15 +++++ .../collector/worker/PersistenceMember.java | 26 +++----- .../collector/worker/PersistenceWorker.java | 21 +------ .../collector/worker/WorkerConfig.java | 5 ++ .../worker/application/ApplicationMember.java | 9 +-- .../metric/TraceSegmentRecordMember.java | 23 +++---- .../persistence/DAGNodePersistence.java | 2 +- .../persistence/NodeInstancePersistence.java | 2 +- .../persistence/ResponseCostPersistence.java | 2 +- .../ResponseSummaryPersistence.java | 2 +- .../presistence/DAGNodeRefPersistence.java | 2 +- .../worker/receiver/TraceSegmentReceiver.java | 11 ++-- .../collector/worker/storage/EsClient.java | 17 +++++- 23 files changed, 197 insertions(+), 157 deletions(-) delete mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractASyncMemberProvider.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMember.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMemberProvider.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMember.java create mode 100644 skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolderFactory.java diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractASyncMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractASyncMemberProvider.java deleted file mode 100644 index 23207cf880..0000000000 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractASyncMemberProvider.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.a.eye.skywalking.collector.actor; - -import akka.actor.ActorRef; -import com.a.eye.skywalking.collector.queue.DaemonThreadFactory; -import com.a.eye.skywalking.collector.queue.MessageHolder; -import com.lmax.disruptor.EventFactory; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; - -import java.lang.reflect.Constructor; - -/** - * @author pengys5 - */ -public abstract class AbstractASyncMemberProvider { - - private RingBuffer ringBuffer; - - public abstract Class memberClass(); - - public T createWorker(EventFactory eventFactory, ActorRef actorRef) throws Exception { - if (memberClass() == null) { - throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()"); - } - - Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class}); - memberConstructor.setAccessible(true); - T member = (T) memberConstructor.newInstance(actorRef); - - // Specify the size of the ring buffer, must be power of 2. - int bufferSize = 1024; - // Construct the Disruptor - Disruptor disruptor = new Disruptor(eventFactory, bufferSize, DaemonThreadFactory.INSTANCE); - // Connect the handler - disruptor.handleEventsWith(member); - // Start the Disruptor, starts all threads running - disruptor.start(); - // Get the ring buffer from the Disruptor to be used for publishing. - ringBuffer = disruptor.getRingBuffer(); - return member; - } - - public void onData(MessageHolder message) { - long sequence = ringBuffer.next(); - try { - ringBuffer.get(sequence).setMessage(message); - } finally { - ringBuffer.publish(sequence); - } - } - - /** - * Use {@link #memberClass()} method returned class's simple name as a role name. - * - * @return is role of Worker - */ - protected String roleName() { - return memberClass().getSimpleName(); - } -} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMember.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMember.java new file mode 100644 index 0000000000..bb957c54ee --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMember.java @@ -0,0 +1,33 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.queue.MessageHolder; +import com.lmax.disruptor.RingBuffer; + +/** + * @author pengys5 + */ +public abstract class AbstractAsyncMember extends AbstractMember { + + private RingBuffer ringBuffer; + + public AbstractAsyncMember(RingBuffer ringBuffer, ActorRef actorRef) { + super(actorRef); + this.ringBuffer = ringBuffer; + } + + public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { + Object message = event.getMessage(); + event.reset(); + receive(message); + } + + public void beTold(Object message) throws Exception { + long sequence = ringBuffer.next(); + try { + ringBuffer.get(sequence).setMessage(message); + } finally { + ringBuffer.publish(sequence); + } + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMemberProvider.java new file mode 100644 index 0000000000..452cedfcb7 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractAsyncMemberProvider.java @@ -0,0 +1,41 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.queue.DaemonThreadFactory; +import com.a.eye.skywalking.collector.queue.MessageHolder; +import com.a.eye.skywalking.collector.queue.MessageHolderFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; + +import java.lang.reflect.Constructor; + +/** + * @author pengys5 + */ +public abstract class AbstractAsyncMemberProvider extends AbstractMemberProvider { + + public abstract int queueSize(); + + @Override + public T createWorker(ActorRef actorRef) throws Exception { + if (memberClass() == null) { + throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()"); + } + + // Specify the size of the ring buffer, must be power of 2. + int bufferSize = queueSize(); + // Construct the Disruptor + Disruptor disruptor = new Disruptor(MessageHolderFactory.INSTANCE, bufferSize, DaemonThreadFactory.INSTANCE); + // Start the Disruptor, starts all threads running + RingBuffer ringBuffer = disruptor.start(); + + Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{RingBuffer.class, ActorRef.class}); + memberConstructor.setAccessible(true); + T member = (T) memberConstructor.newInstance(ringBuffer, actorRef); + + // Connect the handler + disruptor.handleEventsWith(member); + return member; + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java index ae9d5d6bb2..eb39ecf5de 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMember.java @@ -13,13 +13,13 @@ import java.util.List; /** * @author pengys5 */ -public abstract class AbstractMember implements EventHandler> { +public abstract class AbstractMember implements EventHandler { private Logger logger = LogManager.getFormatterLogger(AbstractMember.class); private ActorRef actorRef; - public ActorRef getSelf() { + private ActorRef getSelf() { return actorRef; } @@ -27,6 +27,8 @@ public abstract class AbstractMember implements EventHandler this.actorRef = actorRef; } + public abstract void beTold(Object message) throws Exception; + /** * Receive the message to analyse. * @@ -35,12 +37,6 @@ public abstract class AbstractMember implements EventHandler */ public abstract void receive(Object message) throws Exception; - public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { - T message = event.getMessage(); - event.reset(); - receive(message); - } - /** * Send analysed data to next Worker. * @@ -49,7 +45,7 @@ public abstract class AbstractMember implements EventHandler * @param message is the data used to send to next worker. * @throws Exception */ - public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Exception { + public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Exception { logger.debug("worker provider: %s ,role name: %s", targetWorkerProvider.getClass().getName(), targetWorkerProvider.roleName()); List availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName()); selector.select(availableWorks, message).tell(message, getSelf()); diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java new file mode 100644 index 0000000000..a78824a807 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractMemberProvider.java @@ -0,0 +1,13 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; + +/** + * @author pengys5 + */ +public abstract class AbstractMemberProvider { + + public abstract Class memberClass(); + + public abstract T createWorker(ActorRef actorRef) throws Exception; +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMember.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMember.java new file mode 100644 index 0000000000..68adfd1632 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMember.java @@ -0,0 +1,23 @@ +package com.a.eye.skywalking.collector.actor; + +import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.queue.MessageHolder; + +/** + * @author pengys5 + */ +public abstract class AbstractSyncMember extends AbstractMember { + + public AbstractSyncMember(ActorRef actorRef) { + super(actorRef); + } + + @Override + public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) throws Exception { + } + + @Override + public void beTold(Object message) throws Exception { + receive(message); + } +} diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java index a1016e4f4c..fb83086a74 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractSyncMemberProvider.java @@ -7,27 +7,17 @@ import java.lang.reflect.Constructor; /** * @author pengys5 */ -public abstract class AbstractSyncMemberProvider { - - public abstract Class memberClass(); +public abstract class AbstractSyncMemberProvider extends AbstractMemberProvider { + @Override public T createWorker(ActorRef actorRef) throws Exception { if (memberClass() == null) { throw new IllegalArgumentException("cannot createInstance() with nothing obtained from memberClass()"); } - Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class}); + Constructor memberConstructor = memberClass().getDeclaredConstructor(new Class[]{ActorRef.class}); memberConstructor.setAccessible(true); T member = (T) memberConstructor.newInstance(actorRef); return member; } - - /** - * Use {@link #memberClass()} method returned class's simple name as a role name. - * - * @return is role of Worker - */ - protected String roleName() { - return memberClass().getSimpleName(); - } } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java index d66f88473a..670d0a1ba8 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/AbstractWorker.java @@ -38,7 +38,7 @@ import java.util.List; * } * }}} */ -public abstract class AbstractWorker extends UntypedActor { +public abstract class AbstractWorker extends UntypedActor { private Logger logger = LogManager.getFormatterLogger(AbstractWorker.class); @@ -96,7 +96,7 @@ public abstract class AbstractWorker extends UntypedActor { * @param message is the data used to send to next worker. * @throws Throwable */ - public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, T message) throws Throwable { + public void tell(AbstractWorkerProvider targetWorkerProvider, WorkerSelector selector, Object message) throws Throwable { List availableWorks = WorkersRefCenter.INSTANCE.availableWorks(targetWorkerProvider.roleName()); selector.select(availableWorks, message).tell(message, getSelf()); } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java index 3582d868c0..3e6d20f844 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/actor/selector/WorkerSelector.java @@ -13,7 +13,7 @@ import java.util.List; * * @author wusheng */ -public interface WorkerSelector { +public interface WorkerSelector { /** * select a {@link WorkerRef} from a {@link WorkerRef} list. * @@ -21,5 +21,5 @@ public interface WorkerSelector { * @param message the {@link AbstractWorker} is going to send. * @return the selected {@link WorkerRef} */ - WorkerRef select(List members, T message); + WorkerRef select(List members, Object message); } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java index 779c2346b4..fb83f5e663 100644 --- a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolder.java @@ -3,14 +3,14 @@ package com.a.eye.skywalking.collector.queue; /** * @author pengys5 */ -public class MessageHolder { - private T message; +public class MessageHolder { + private Object message; - public T getMessage() { + public Object getMessage() { return message; } - public void setMessage(T message) { + public void setMessage(Object message) { this.message = message; } diff --git a/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolderFactory.java b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolderFactory.java new file mode 100644 index 0000000000..972843e468 --- /dev/null +++ b/skywalking-collector/skywalking-collector-cluster/src/main/java/com/a/eye/skywalking/collector/queue/MessageHolderFactory.java @@ -0,0 +1,15 @@ +package com.a.eye.skywalking.collector.queue; + +import com.lmax.disruptor.EventFactory; + +/** + * @author pengys5 + */ +public class MessageHolderFactory implements EventFactory { + + public static MessageHolderFactory INSTANCE = new MessageHolderFactory(); + + public MessageHolder newInstance() { + return new MessageHolder(); + } +} diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java index bad069c990..76f5a3f4b4 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceMember.java @@ -1,9 +1,12 @@ package com.a.eye.skywalking.collector.worker; import akka.actor.ActorRef; +import com.a.eye.skywalking.collector.actor.AbstractAsyncMember; import com.a.eye.skywalking.collector.actor.AbstractMember; +import com.a.eye.skywalking.collector.queue.MessageHolder; import com.a.eye.skywalking.collector.worker.storage.EsClient; import com.google.gson.JsonObject; +import com.lmax.disruptor.RingBuffer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -16,7 +19,7 @@ import java.util.Map; /** * @author pengys5 */ -public abstract class PersistenceMember extends AbstractMember { +public abstract class PersistenceMember extends AbstractAsyncMember { private Logger logger = LogManager.getFormatterLogger(PersistenceMember.class); @@ -24,8 +27,8 @@ public abstract class PersistenceMember extends AbstractMember { private Map persistenceData = new HashMap(); - public PersistenceMember(ActorRef actorRef) { - super(actorRef); + public PersistenceMember(RingBuffer ringBuffer, ActorRef actorRef) { + super(ringBuffer, actorRef); } public abstract String esIndex(); @@ -54,7 +57,6 @@ public abstract class PersistenceMember extends AbstractMember { if (message instanceof PersistenceCommand) { persistence(false); } else { - logger.debug("receive message"); analyse(message); } } @@ -62,25 +64,11 @@ public abstract class PersistenceMember extends AbstractMember { private void persistence(boolean dataFull) { long now = System.currentTimeMillis(); if (now - lastPersistenceTimestamp > 5000 || dataFull) { - boolean success = saveToEs(); + boolean success = EsClient.saveToEs(esIndex(), esType(), persistenceData); if (success) { persistenceData.clear(); lastPersistenceTimestamp = now; } } } - - private boolean saveToEs() { - TransportClient client = EsClient.client(); - BulkRequestBuilder bulkRequest = client.prepareBulk(); - - for (Map.Entry entry : persistenceData.entrySet()) { - String id = entry.getKey(); - JsonObject data = entry.getValue(); - bulkRequest.add(client.prepareIndex(esIndex(), esType(), id).setSource(data.toString())); - } - - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - return !bulkResponse.hasFailures(); - } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java index 78f0ee48c9..8380b96397 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/PersistenceWorker.java @@ -5,9 +5,6 @@ import com.a.eye.skywalking.collector.worker.storage.EsClient; import com.google.gson.JsonObject; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.client.transport.TransportClient; import java.util.HashMap; import java.util.Map; @@ -15,7 +12,7 @@ import java.util.Map; /** * @author pengys5 */ -public abstract class PersistenceWorker extends AbstractWorker { +public abstract class PersistenceWorker extends AbstractWorker { private Logger logger = LogManager.getFormatterLogger(PersistenceWorker.class); @@ -57,25 +54,11 @@ public abstract class PersistenceWorker extends AbstractWorker { private void persistence(boolean dataFull) { long now = System.currentTimeMillis(); if (now - lastPersistenceTimestamp > 5000 || dataFull) { - boolean success = saveToEs(); + boolean success = EsClient.saveToEs(esIndex(), esType(), persistenceData); if (success) { persistenceData.clear(); lastPersistenceTimestamp = now; } } } - - private boolean saveToEs() { - TransportClient client = EsClient.client(); - BulkRequestBuilder bulkRequest = client.prepareBulk(); - - for (Map.Entry entry : persistenceData.entrySet()) { - String id = entry.getKey(); - JsonObject data = entry.getValue(); - bulkRequest.add(client.prepareIndex(esIndex(), esType(), id).setSource(data.toString())); - } - - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - return !bulkResponse.hasFailures(); - } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java index 82f6f634ac..c23cc6a87d 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/WorkerConfig.java @@ -37,4 +37,9 @@ public class WorkerConfig extends ClusterConfig { } } + public static class Queue { + public static class TraceSegmentRecordMember { + public static int Size = 32; + } + } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java index 7f32b11a68..32fc7e4f70 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/ApplicationMember.java @@ -1,7 +1,7 @@ package com.a.eye.skywalking.collector.worker.application; import akka.actor.ActorRef; -import com.a.eye.skywalking.collector.actor.AbstractMember; +import com.a.eye.skywalking.collector.actor.AbstractSyncMember; import com.a.eye.skywalking.collector.actor.AbstractSyncMemberProvider; import com.a.eye.skywalking.collector.actor.selector.RollingSelector; import com.a.eye.skywalking.collector.worker.application.metric.TraceSegmentRecordMember; @@ -19,14 +19,15 @@ import org.apache.logging.log4j.Logger; /** * @author pengys5 */ -public class ApplicationMember extends AbstractMember { +public class ApplicationMember extends AbstractSyncMember { private Logger logger = LogManager.getFormatterLogger(ApplicationMember.class); - private TraceSegmentRecordMember traceSegmentRecordMember = TraceSegmentRecordMember.Factory.INSTANCE.createWorker(TraceSegmentRecordMember.MessageFactory.INSTANCE, getSelf()); + private TraceSegmentRecordMember recordMember; public ApplicationMember(ActorRef actorRef) throws Exception { super(actorRef); + recordMember = TraceSegmentRecordMember.Factory.INSTANCE.createWorker(actorRef); } @Override @@ -34,7 +35,7 @@ public class ApplicationMember extends AbstractMember { if (message instanceof TraceSegment) { logger.debug("begin translate TraceSegment Object to JsonObject"); TraceSegment traceSegment = (TraceSegment) message; - traceSegmentRecordMember.receive(traceSegment); + recordMember.beTold(traceSegment); sendToDAGNodePersistence(traceSegment); sendToNodeInstancePersistence(traceSegment); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java index a8e5bbe879..f6bb645673 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/metric/TraceSegmentRecordMember.java @@ -2,18 +2,18 @@ package com.a.eye.skywalking.collector.worker.application.metric; import akka.actor.ActorRef; -import com.a.eye.skywalking.collector.actor.AbstractASyncMemberProvider; -import com.a.eye.skywalking.collector.actor.AbstractMember; +import com.a.eye.skywalking.collector.actor.AbstractAsyncMemberProvider; import com.a.eye.skywalking.collector.actor.selector.RollingSelector; import com.a.eye.skywalking.collector.queue.MessageHolder; import com.a.eye.skywalking.collector.worker.PersistenceMember; +import com.a.eye.skywalking.collector.worker.WorkerConfig; import com.a.eye.skywalking.collector.worker.application.persistence.TraceSegmentRecordPersistence; import com.a.eye.skywalking.trace.Span; import com.a.eye.skywalking.trace.TraceSegment; import com.a.eye.skywalking.trace.TraceSegmentRef; import com.google.gson.JsonArray; import com.google.gson.JsonObject; -import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.RingBuffer; import java.util.List; import java.util.Map; @@ -33,8 +33,8 @@ public class TraceSegmentRecordMember extends PersistenceMember { return "trace_segment"; } - public TraceSegmentRecordMember(ActorRef actorRef) throws Throwable { - super(actorRef); + public TraceSegmentRecordMember(RingBuffer ringBuffer, ActorRef actorRef) { + super(ringBuffer, actorRef); } @Override @@ -47,16 +47,13 @@ public class TraceSegmentRecordMember extends PersistenceMember { } } - public static class MessageFactory implements EventFactory { - public static MessageFactory INSTANCE = new MessageFactory(); + public static class Factory extends AbstractAsyncMemberProvider { + public static Factory INSTANCE = new Factory(); - public MessageHolder newInstance() { - return new MessageHolder(); + @Override + public int queueSize() { + return WorkerConfig.Queue.TraceSegmentRecordMember.Size; } - } - - public static class Factory extends AbstractASyncMemberProvider { - public static Factory INSTANCE = new Factory(); @Override public Class memberClass() { diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java index 1e4004c678..835e460ecc 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/DAGNodePersistence.java @@ -12,7 +12,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class DAGNodePersistence extends PersistenceWorker { +public class DAGNodePersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(DAGNodePersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java index 0edb3c4be2..40051efcda 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/NodeInstancePersistence.java @@ -10,7 +10,7 @@ import org.apache.logging.log4j.Logger; /** * @author pengys5 */ -public class NodeInstancePersistence extends PersistenceWorker { +public class NodeInstancePersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(NodeInstancePersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java index af7926dfb0..3a80b06a98 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseCostPersistence.java @@ -12,7 +12,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class ResponseCostPersistence extends PersistenceWorker { +public class ResponseCostPersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(ResponseCostPersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java index 164e73ec25..a5dc5bb509 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/application/persistence/ResponseSummaryPersistence.java @@ -12,7 +12,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class ResponseSummaryPersistence extends PersistenceWorker { +public class ResponseSummaryPersistence extends PersistenceWorker { private Logger logger = LogManager.getFormatterLogger(ResponseSummaryPersistence.class); diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java index afff42b305..546150c8f5 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/applicationref/presistence/DAGNodeRefPersistence.java @@ -10,7 +10,7 @@ import java.io.Serializable; /** * @author pengys5 */ -public class DAGNodeRefPersistence extends PersistenceWorker { +public class DAGNodeRefPersistence extends PersistenceWorker { @Override public String esIndex() { diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java index 7950925218..7e3517b148 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/receiver/TraceSegmentReceiver.java @@ -16,11 +16,14 @@ public class TraceSegmentReceiver extends AbstractWorker { private Logger logger = LogManager.getFormatterLogger(TraceSegmentReceiver.class); - private ApplicationMember applicationMember = ApplicationMember.Factory.INSTANCE.createWorker(getSelf()); + private ApplicationMember applicationMember; + + private ApplicationRefMember applicationRefMember; - private ApplicationRefMember applicationRefMember = ApplicationRefMember.Factory.INSTANCE.createWorker(getSelf()); public TraceSegmentReceiver() throws Exception { + applicationMember = ApplicationMember.Factory.INSTANCE.createWorker(getSelf()); + applicationRefMember = ApplicationRefMember.Factory.INSTANCE.createWorker(getSelf()); } @Override @@ -29,8 +32,8 @@ public class TraceSegmentReceiver extends AbstractWorker { TraceSegment traceSegment = (TraceSegment) message; logger.debug("receive message instanceof TraceSegment, traceSegmentId is %s", traceSegment.getTraceSegmentId()); - applicationMember.receive(traceSegment); - applicationRefMember.receive(traceSegment); + applicationMember.beTold(traceSegment); + applicationRefMember.beTold(traceSegment); } } diff --git a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java index 5f84d4ded2..3a8f6f0986 100644 --- a/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java +++ b/skywalking-collector/skywalking-collector-worker/src/main/java/com/a/eye/skywalking/collector/worker/storage/EsClient.java @@ -1,5 +1,8 @@ package com.a.eye.skywalking.collector.worker.storage; +import com.google.gson.JsonObject; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -7,6 +10,7 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Map; /** * @author pengys5 @@ -24,7 +28,16 @@ public class EsClient { .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)); } - public static TransportClient client() { - return client; + public static boolean saveToEs(String esIndex, String esType, Map persistenceData) { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + + for (Map.Entry entry : persistenceData.entrySet()) { + String id = entry.getKey(); + JsonObject data = entry.getValue(); + bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(data.toString())); + } + + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + return !bulkResponse.hasFailures(); } } -- GitLab