提交 4fda33b1 编写于 作者: wu-sheng's avatar wu-sheng

Support retry when use IF_POSSIBLE strategy.

上级 11ff93ae
......@@ -12,9 +12,11 @@ import org.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
public class Channels<T> {
private final Buffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
private BufferStrategy strategy;
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
this.strategy = strategy;
bufferChannels = new Buffer[channelSize];
for (int i = 0; i < channelSize; i++) {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
......@@ -23,7 +25,19 @@ public class Channels<T> {
public boolean save(T data) {
int index = dataPartitioner.partition(bufferChannels.length, data);
return bufferChannels[index].save(data);
int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
for (; retryCountDown > 0; retryCountDown--) {
if (bufferChannels[index].save(data)) {
return true;
}
}
return false;
}
public void setPartitioner(IDataPartitioner<T> dataPartitioner) {
......
package org.skywalking.apm.commons.datacarrier.partition;
import org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
/**
* Created by wusheng on 2016/10/25.
*/
public interface IDataPartitioner<T> {
int partition(int total, T data);
/**
* @return an integer represents how many times should retry when {@link BufferStrategy#IF_POSSIBLE}.
*
* Less or equal 1, means not support retry.
*/
int maxRetryCount();
}
......@@ -6,8 +6,22 @@ package org.skywalking.apm.commons.datacarrier.partition;
* Created by wusheng on 2016/10/25.
*/
public class ProducerThreadPartitioner<T> implements IDataPartitioner<T> {
private int retryTime = 3;
public ProducerThreadPartitioner() {
}
public ProducerThreadPartitioner(int retryTime) {
this.retryTime = retryTime;
}
@Override
public int partition(int total, T data) {
return (int)Thread.currentThread().getId() % total;
}
@Override
public int maxRetryCount() {
return 1;
}
}
......@@ -14,8 +14,8 @@ public class SimpleRollingPartitioner<T> implements IDataPartitioner<T> {
return Math.abs(i++ % total);
}
public static void main(String[] args) {
SimpleRollingPartitioner s = new SimpleRollingPartitioner();
System.out.print(s.i++ % 10);
@Override
public int maxRetryCount() {
return 3;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册