提交 b86426c9 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Add consumer pool to Datacarrier (#2245)

* Extend consume pool to data carrier.

* Change indicator aggregate worker and persistent worker to use default consumer pool.

* Fix CI.
上级 4c642906
......@@ -18,12 +18,9 @@
package org.apache.skywalking.apm.commons.datacarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.apm.commons.datacarrier.partition.*;
/**
* DataCarrier main class. use this instance to set Producer/Consumer Model.
......@@ -32,7 +29,7 @@ public class DataCarrier<T> {
private final int bufferSize;
private final int channelSize;
private Channels<T> channels;
private ConsumerPool<T> consumerPool;
private IDriver driver;
private String name;
public DataCarrier(int channelSize, int bufferSize) {
......@@ -47,7 +44,8 @@ public class DataCarrier<T> {
}
/**
* set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link SimpleRollingPartitioner}
* set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
* SimpleRollingPartitioner}
*
* @param dataPartitioner to partition data into different channel by some rules.
* @return DataCarrier instance for chain
......@@ -79,8 +77,8 @@ public class DataCarrier<T> {
* @return false means produce data failure. The data will not be consumed.
*/
public boolean produce(T data) {
if (consumerPool != null) {
if (!consumerPool.isRunning()) {
if (driver != null) {
if (!driver.isRunning(channels)) {
return false;
}
}
......@@ -89,22 +87,22 @@ public class DataCarrier<T> {
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
* set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
if (driver != null) {
driver.close(channels);
}
consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
consumerPool.begin();
driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle);
driver.begin(channels);
return this;
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
* set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
* millis consume cycle.
*
* @param consumerClass class of consumer
......@@ -115,23 +113,23 @@ public class DataCarrier<T> {
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
* set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
if (driver != null) {
driver.close(channels);
}
consumerPool = new ConsumerPool<T>(this.name, this.channels, consumer, num, consumeCycle);
consumerPool.begin();
driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, consumeCycle);
driver.begin(channels);
return this;
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
* set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
* millis consume cycle.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
......@@ -142,14 +140,28 @@ public class DataCarrier<T> {
return this.consume(consumer, num, 20);
}
/**
* Set a consumer pool to manage the channels of this DataCarrier. Then consumerPool could use its own consuming
* model to adjust the consumer thread and throughput.
*
* @param consumerPool
* @return
*/
public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
driver = consumerPool;
consumerPool.add(this.name, channels, consumer);
driver.begin(channels);
return this;
}
/**
* shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
* BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumers maybe cause blocking when producing.
* Better way to change consumers are use {@link DataCarrier#consume}
* BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumeDriver maybe cause blocking when producing.
* Better way to change consumeDriver are use {@link DataCarrier#consume}
*/
public void shutdownConsumers() {
if (consumerPool != null) {
consumerPool.close();
if (driver != null) {
driver.close(channels);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
/**
* Pool of consumers <p> Created by wusheng on 2016/10/25.
*/
public class ConsumeDriver<T> implements IDriver {
private boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;
public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
private ConsumeDriver(Channels<T> channels, int num) {
running = false;
this.channels = channels;
consumerThreads = new ConsumerThread[num];
lock = new ReentrantLock();
}
private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
try {
IConsumer<T> inst = consumerClass.newInstance();
inst.init();
return inst;
} catch (InstantiationException e) {
throw new ConsumerCannotBeCreatedException(e);
} catch (IllegalAccessException e) {
throw new ConsumerCannotBeCreatedException(e);
}
}
@Override
public void begin(Channels channels) {
if (running) {
return;
}
try {
lock.lock();
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
@Override
public boolean isRunning(Channels channels) {
return running;
}
private void allocateBuffer2Thread() {
int channelSize = this.channels.getChannelSize();
if (channelSize < consumerThreads.length) {
/**
* if consumerThreads.length > channelSize
* each channel will be process by several consumers.
*/
ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
int index = threadIndex % channelSize;
if (threadAllocation[index] == null) {
threadAllocation[index] = new ArrayList<Integer>();
}
threadAllocation[index].add(threadIndex);
}
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
Buffer<T> channel = this.channels.getBuffer(channelIndex);
int bufferSize = channel.getBufferSize();
int step = bufferSize / threadAllocationPerChannel.size();
for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
int threadIndex = threadAllocationPerChannel.get(i);
int start = i * step;
int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
consumerThreads[threadIndex].addDataSource(channel, start, end);
}
}
} else {
/**
* if consumerThreads.length < channelSize
* each consumer will process several channels.
*
* if consumerThreads.length == channelSize
* each consumer will process one channel.
*/
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
int consumerIndex = channelIndex % consumerThreads.length;
consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
}
}
}
@Override
public void close(Channels channels) {
try {
lock.lock();
this.running = false;
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.shutdown();
}
} finally {
lock.unlock();
}
}
}
......@@ -18,131 +18,15 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* Pool of consumers <p> Created by wusheng on 2016/10/25.
* The Consumer pool could support data consumer from multiple {@link DataCarrier}s,
* by using different consume thread management models.
*
* @author wusheng
*/
public class ConsumerPool<T> {
private boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;
public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
private ConsumerPool(Channels<T> channels, int num) {
running = false;
this.channels = channels;
consumerThreads = new ConsumerThread[num];
lock = new ReentrantLock();
}
private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
try {
IConsumer<T> inst = consumerClass.newInstance();
inst.init();
return inst;
} catch (InstantiationException e) {
throw new ConsumerCannotBeCreatedException(e);
} catch (IllegalAccessException e) {
throw new ConsumerCannotBeCreatedException(e);
}
}
public void begin() {
if (running) {
return;
}
try {
lock.lock();
this.allocateBuffer2Thread();
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
public boolean isRunning() {
return running;
}
private void allocateBuffer2Thread() {
int channelSize = this.channels.getChannelSize();
if (channelSize < consumerThreads.length) {
/**
* if consumerThreads.length > channelSize
* each channel will be process by several consumers.
*/
ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
int index = threadIndex % channelSize;
if (threadAllocation[index] == null) {
threadAllocation[index] = new ArrayList<Integer>();
}
threadAllocation[index].add(threadIndex);
}
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
Buffer<T> channel = this.channels.getBuffer(channelIndex);
int bufferSize = channel.getBufferSize();
int step = bufferSize / threadAllocationPerChannel.size();
for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
int threadIndex = threadAllocationPerChannel.get(i);
int start = i * step;
int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
consumerThreads[threadIndex].addDataSource(channel, start, end);
}
}
} else {
/**
* if consumerThreads.length < channelSize
* each consumer will process several channels.
*
* if consumerThreads.length == channelSize
* each consumer will process one channel.
*/
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
int consumerIndex = channelIndex % consumerThreads.length;
consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
}
}
}
public void close() {
try {
lock.lock();
this.running = false;
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.shutdown();
}
} finally {
lock.unlock();
}
}
public interface ConsumerPool extends IDriver {
void add(String name, Channels channels, IConsumer consumer);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.*;
import java.util.concurrent.Callable;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* Consumer Pool Factory provides global management for all Consumer Pool.
*
* @author wusheng
*/
public enum ConsumerPoolFactory {
INSTANCE;
private Map<String, ConsumerPool> pools;
ConsumerPoolFactory() {
pools = new HashMap<String, ConsumerPool>();
}
public synchronized boolean createIfAbsent(String poolName, Callable<ConsumerPool> creator) throws Exception {
if (pools.containsKey(poolName)) {
return false;
} else {
pools.put(poolName, creator.call());
return true;
}
}
public ConsumerPool get(String poolName) {
return pools.get(poolName);
}
/**
* Default pool provides the same capabilities as DataCarrier#consume(IConsumer, 1), which alloc one thread for one
* DataCarrier.
*/
public static final ConsumerPool DEFAULT_POOL = new ConsumerPool() {
private Map<Channels, ConsumeDriver> allDrivers = new HashMap<Channels, ConsumeDriver>();
@Override synchronized public void add(String name, Channels channels, IConsumer consumer) {
if (!allDrivers.containsKey(channels)) {
ConsumeDriver consumeDriver = new ConsumeDriver(name, channels, consumer, 1, 20);
allDrivers.put(channels, consumeDriver);
}
}
@Override public boolean isRunning(Channels channels) {
ConsumeDriver driver = allDrivers.get(channels);
return driver == null ? false : driver.isRunning(channels);
}
@Override public void close(Channels channels) {
ConsumeDriver driver = allDrivers.get(channels);
if (driver != null) {
driver.close(channels);
}
}
@Override public void begin(Channels channels) {
ConsumeDriver driver = allDrivers.get(channels);
if (driver != null) {
driver.begin(channels);
}
}
};
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* The driver of consumer.
*
* @author wusheng
*/
public interface IDriver {
boolean isRunning(Channels channels);
void close(Channels channels);
void begin(Channels channels);
}
......@@ -30,28 +30,28 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
/**
* Created by wusheng on 2016/10/26.
*/
public class ConsumerPoolTest {
public class ConsumeDriverTest {
@Test
public void testBeginConsumerPool() throws IllegalAccessException {
public void testBeginConsumeDriver() throws IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
pool.begin();
ConsumeDriver<SampleData> pool = new ConsumeDriver<SampleData>("default", channels, new SampleConsumer(), 2, 20);
pool.begin(channels);
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
Assert.assertEquals(2, threads.length);
Assert.assertTrue(threads[0].isAlive());
Assert.assertTrue(threads[1].isAlive());
}
@Test
public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
public void testCloseConsumeDriver() throws InterruptedException, IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
pool.begin();
ConsumeDriver<SampleData> pool = new ConsumeDriver<SampleData>("default", channels, new SampleConsumer(), 2, 20);
pool.begin(channels);
Thread.sleep(5000);
pool.close();
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
pool.close(channels);
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
Assert.assertEquals(2, threads.length);
Assert.assertFalse((Boolean)MemberModifier.field(ConsumerThread.class, "running").get(threads[0]));
......
......@@ -129,8 +129,8 @@ public class ConsumerTest {
}
private IConsumer getConsumer(DataCarrier<SampleData> carrier) throws IllegalAccessException {
ConsumerPool pool = (ConsumerPool)MemberModifier.field(DataCarrier.class, "consumerPool").get(carrier);
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
ConsumeDriver pool = (ConsumeDriver)MemberModifier.field(DataCarrier.class, "driver").get(carrier);
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
return (IConsumer)MemberModifier.field(ConsumerThread.class, "consumer").get(threads[0]);
}
......
......@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
......@@ -52,7 +52,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
......
......@@ -23,7 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
......@@ -56,7 +56,8 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
}
@Override void onWork(Indicator indicator) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册