AbstractLocalAsyncWorker.java 3.7 KB
Newer Older
1 2 3 4 5 6 7 8
package com.a.eye.skywalking.collector.actor;

import com.a.eye.skywalking.collector.queue.EndOfBatchCommand;
import com.a.eye.skywalking.collector.queue.MessageHolder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;

/**
P
pengys5 已提交
9 10
 * The <code>AbstractLocalAsyncWorker</code> implementations represent workers,
 * which receive local asynchronous message.
P
pengys5 已提交
11
 *
12
 * @author pengys5
P
pengys5 已提交
13
 * @since v3.0-2017
14 15 16
 */
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {

P
pengys5 已提交
17
    /**
P
pengys5 已提交
18
     * Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
P
pengys5 已提交
19 20 21 22 23 24
     *
     * @param role           The responsibility of worker in cluster, more than one workers can have
     *                       same responsibility which use to provide load balancing ability.
     * @param clusterContext See {@link ClusterWorkerContext}
     * @param selfContext    See {@link LocalWorkerContext}
     */
25 26
    public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
        super(role, clusterContext, selfContext);
27 28
    }

P
pengys5 已提交
29 30 31 32 33 34
    /**
     * The asynchronous worker always use to persistence data into db, this is the end of the streaming,
     * so usually no need to create the next worker instance at the time of this worker instance create.
     *
     * @throws ProviderNotFoundException When worker provider not found, it will be throw this exception.
     */
35 36 37 38
    @Override
    public void preStart() throws ProviderNotFoundException {
    }

P
pengys5 已提交
39 40 41 42 43 44 45 46
    /**
     * Receive message
     *
     * @param message The persistence data or metric data.
     * @throws Exception The Exception happen in {@link #onWork(Object)}
     */
    final public void allocateJob(Object message) throws Exception {
        onWork(message);
47 48
    }

P
pengys5 已提交
49 50 51 52 53 54 55
    /**
     * The data process logic in this method.
     *
     * @param message Cast the message object to a expect subclass.
     * @throws Exception Don't handle the exception, throw it.
     */
    protected abstract void onWork(Object message) throws Exception;
56

57 58 59 60 61
    static class WorkerWithDisruptor implements EventHandler<MessageHolder> {

        private RingBuffer<MessageHolder> ringBuffer;
        private AbstractLocalAsyncWorker asyncWorker;

P
pengys5 已提交
62
        WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) {
63 64 65 66
            this.ringBuffer = ringBuffer;
            this.asyncWorker = asyncWorker;
        }

P
pengys5 已提交
67 68 69 70 71 72 73 74
        /**
         * Receive the message from disruptor, when message in disruptor is empty, then send the cached data
         * to the next workers.
         *
         * @param event      published to the {@link RingBuffer}
         * @param sequence   of the event being processed
         * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
         */
75 76 77 78
        public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
            try {
                Object message = event.getMessage();
                event.reset();
79 80

                asyncWorker.allocateJob(message);
81
                if (endOfBatch) {
82
                    asyncWorker.allocateJob(new EndOfBatchCommand());
83 84
                }
            } catch (Exception e) {
P
pengys5 已提交
85
                asyncWorker.saveException(e);
86 87 88
            }
        }

P
pengys5 已提交
89 90 91 92 93 94
        /**
         * Push the message into disruptor ring buffer.
         *
         * @param message of the data to process.
         * @throws Exception not used.
         */
95 96 97 98 99 100 101 102 103 104
        public void tell(Object message) throws Exception {
            long sequence = ringBuffer.next();
            try {
                ringBuffer.get(sequence).setMessage(message);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
}