提交 678f3009 编写于 作者: wu-sheng's avatar wu-sheng

Support to set consume cycle. And invoke the existed callbacks when the...

Support to set consume cycle. And invoke the existed callbacks when the message blocked at first time.
上级 c45cdcb8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
/**
* @author wu-sheng
*/
public class BlockingDataCarrier<T> {
private Channels<T> channels;
BlockingDataCarrier(Channels<T> channels) {
this.channels = channels;
}
public void addCallback(QueueBlockingCallback<T> callback) {
this.channels.addCallback(callback);
}
}
......@@ -16,21 +16,17 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
/**
* DataCarrier main class.
* use this instance to set Producer/Consumer Model
* <p>
* Created by wusheng on 2016/10/25.
* DataCarrier main class. use this instance to set Producer/Consumer Model.
*/
public class DataCarrier<T> {
private final int bufferSize;
......@@ -45,8 +41,8 @@ public class DataCarrier<T> {
}
/**
* set a new IDataPartitioner.
* It will cover the current one or default one.(Default is {@link SimpleRollingPartitioner)}
* set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
* SimpleRollingPartitioner)}
*
* @param dataPartitioner
* @return
......@@ -57,8 +53,7 @@ public class DataCarrier<T> {
}
/**
* override the strategy at runtime.
* Notice, {@link Channels<T>} will override several channels one by one.
* override the strategy at runtime. Notice, {@link Channels<T>} will override several channels one by one.
*
* @param strategy
*/
......@@ -67,6 +62,11 @@ public class DataCarrier<T> {
return this;
}
public BlockingDataCarrier<T> toBlockingDataCarrier() {
this.channels.setStrategy(BufferStrategy.BLOCKING);
return new BlockingDataCarrier<T>(this.channels);
}
/**
* produce data to buffer, using the givven {@link BufferStrategy}.
*
......@@ -84,38 +84,59 @@ public class DataCarrier<T> {
}
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
}
consumerPool = new ConsumerPool<T>(this.channels, consumerClass, num);
consumerPool = new ConsumerPool<T>(this.channels, consumerClass, num, consumeCycle);
consumerPool.begin();
return this;
}
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work with 20
* millis consume cycle.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
return this.consume(consumerClass, num, 20);
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num) {
public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
}
consumerPool = new ConsumerPool<T>(this.channels, consumer, num);
consumerPool = new ConsumerPool<T>(this.channels, consumer, num, consumeCycle);
consumerPool.begin();
return this;
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work with 20
* millis consume cycle.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num) {
return this.consume(consumer, num, 20);
}
/**
* shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
* BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumers maybe cause blocking when producing.
......
......@@ -16,10 +16,11 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
/**
......@@ -29,23 +30,36 @@ public class Buffer<T> {
private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
private List<QueueBlockingCallback<T>> callbacks;
Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
callbacks = new LinkedList<QueueBlockingCallback<T>>();
}
void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
void addCallback(QueueBlockingCallback<T> callback) {
callbacks.add(callback);
}
boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = false;
for (QueueBlockingCallback<T> callback : callbacks) {
callback.notify(data);
}
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
......
......@@ -16,17 +16,14 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
/**
* Channels of Buffer
* It contais all buffer data which belongs to this channel.
* It supports several strategy when buffer is full. The Default is BLOCKING
* <p>
* Created by wusheng on 2016/10/25.
* Channels of Buffer It contais all buffer data which belongs to this channel. It supports several strategy when buffer
* is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
*/
public class Channels<T> {
private final Buffer<T>[] bufferChannels;
......@@ -87,4 +84,10 @@ public class Channels<T> {
public Buffer<T> getBuffer(int index) {
return this.bufferChannels[index];
}
public void addCallback(QueueBlockingCallback<T> callback) {
for (Buffer<T> channel : bufferChannels) {
channel.addCallback(callback);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier.callback;
/**
* Notify when the queue, which is in blocking strategy, has be blocked.
*
* @author wu-sheng
*/
public interface QueueBlockingCallback<T> {
void notify(T message);
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
......@@ -25,9 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* Pool of consumers
* <p>
* Created by wusheng on 2016/10/25.
* Pool of consumers <p> Created by wusheng on 2016/10/25.
*/
public class ConsumerPool<T> {
private boolean running;
......@@ -35,19 +32,19 @@ public class ConsumerPool<T> {
private Channels<T> channels;
private ReentrantLock lock;
public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num) {
public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num) {
public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype);
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
......
......@@ -30,12 +30,14 @@ public class ConsumerThread<T> extends Thread {
private volatile boolean running;
private IConsumer<T> consumer;
private List<DataSource> dataSources;
private long consumeCycle;
ConsumerThread(String threadName, IConsumer<T> consumer) {
ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
super(threadName);
this.consumer = consumer;
running = false;
dataSources = new LinkedList<DataSource>();
this.consumeCycle = consumeCycle;
}
/**
......@@ -67,7 +69,7 @@ public class ConsumerThread<T> extends Thread {
if (!hasData) {
try {
Thread.sleep(20);
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
......
......@@ -34,7 +34,7 @@ public class ConsumerPoolTest {
@Test
public void testBeginConsumerPool() throws IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
pool.begin();
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
......@@ -46,7 +46,7 @@ public class ConsumerPoolTest {
@Test
public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
pool.begin();
Thread.sleep(5000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册