/* * Copyright 2017, OpenSkywalking Organization All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * Project repository: https://github.com/OpenSkywalking/skywalking */ package org.skywalking.apm.commons.datacarrier.consumer; import java.util.LinkedList; import java.util.List; import org.skywalking.apm.commons.datacarrier.buffer.Buffer; /** * Created by wusheng on 2016/10/25. */ public class ConsumerThread extends Thread { private volatile boolean running; private IConsumer consumer; private List dataSources; ConsumerThread(String threadName, IConsumer consumer) { super(threadName); this.consumer = consumer; running = false; dataSources = new LinkedList(); } /** * add partition of buffer to consume * * @param sourceBuffer * @param start * @param end */ void addDataSource(Buffer sourceBuffer, int start, int end) { this.dataSources.add(new DataSource(sourceBuffer, start, end)); } /** * add whole buffer to consume * * @param sourceBuffer */ void addDataSource(Buffer sourceBuffer) { this.dataSources.add(new DataSource(sourceBuffer, 0, sourceBuffer.getBufferSize())); } @Override public void run() { running = true; while (running) { boolean hasData = consume(); if (!hasData) { try { Thread.sleep(20); } catch (InterruptedException e) { } } } // consumer thread is going to stop // consume the last time consume(); consumer.onExit(); } private boolean consume() { boolean hasData = false; LinkedList consumeList = new LinkedList(); for (DataSource dataSource : dataSources) { LinkedList data = dataSource.obtain(); if (data.size() == 0) { continue; } for (T element : data) { consumeList.add(element); } hasData = true; } if (consumeList.size() > 0) { try { consumer.consume(consumeList); } catch (Throwable t) { consumer.onError(consumeList, t); } } return hasData; } void shutdown() { running = false; } /** * DataSource is a refer to {@link Buffer}. */ class DataSource { private Buffer sourceBuffer; private int start; private int end; DataSource(Buffer sourceBuffer, int start, int end) { this.sourceBuffer = sourceBuffer; this.start = start; this.end = end; } LinkedList obtain() { return sourceBuffer.obtain(start, end); } } }