AggregationWorker.java 2.7 KB
Newer Older
P
peng-yongsheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright 2017, OpenSkywalking Organization All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Project repository: https://github.com/OpenSkywalking/skywalking
 */

package org.skywalking.apm.collector.stream.worker.impl;

import org.skywalking.apm.collector.core.data.Data;
P
peng-yongsheng 已提交
22
import org.skywalking.apm.collector.core.module.ModuleManager;
P
peng-yongsheng 已提交
23 24 25 26 27 28 29 30 31
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author peng-yongsheng
 */
32
public abstract class AggregationWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
P
peng-yongsheng 已提交
33 34 35 36 37 38

    private final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);

    private DataCache dataCache;
    private int messageNum;

P
peng-yongsheng 已提交
39 40
    public AggregationWorker(ModuleManager moduleManager) {
        super(moduleManager);
41
        this.dataCache = new DataCache();
P
peng-yongsheng 已提交
42 43
    }

44 45 46
    @Override protected final void onWork(INPUT message) throws WorkerException {
        messageNum++;
        aggregate(message);
P
peng-yongsheng 已提交
47

48 49 50 51 52
        if (messageNum >= 100) {
            sendToNext();
            messageNum = 0;
        }
        if (message.isEndOfBatch()) {
P
peng-yongsheng 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65
            sendToNext();
        }
    }

    private void sendToNext() throws WorkerException {
        dataCache.switchPointer();
        while (dataCache.getLast().isWriting()) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                throw new WorkerException(e.getMessage(), e);
            }
        }
66
        dataCache.getLast().collection().forEach((String id, Data data) -> {
67 68
            logger.debug(data.toString());
            onNext((OUTPUT)data);
P
peng-yongsheng 已提交
69 70 71 72
        });
        dataCache.finishReadingLast();
    }

73
    private void aggregate(INPUT message) {
P
peng-yongsheng 已提交
74
        dataCache.writing();
75
        if (dataCache.containsKey(message.getId())) {
76
            dataCache.get(message.getId()).mergeData(message);
P
peng-yongsheng 已提交
77
        } else {
78
            dataCache.put(message.getId(), message);
P
peng-yongsheng 已提交
79 80 81 82
        }
        dataCache.finishWriting();
    }
}