提交 f4ab7b53 编写于 作者: A ascrutae

Merge remote-tracking branch 'origin/master'

......@@ -5,8 +5,6 @@ import static com.ai.cloud.skywalking.conf.Config.Consumer.CONSUMER_FAIL_RETRY_W
import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_CONSUMER;
import static com.ai.cloud.skywalking.conf.Config.Consumer.MAX_WAIT_TIME;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -14,12 +12,13 @@ import com.ai.cloud.skywalking.conf.Config;
import com.ai.cloud.skywalking.conf.Constants;
import com.ai.cloud.skywalking.protocol.Span;
import com.ai.cloud.skywalking.sender.DataSenderFactoryWithBalance;
import com.ai.cloud.skywalking.util.AtomicRangeInteger;
public class BufferGroup {
private static Logger logger = LogManager.getLogger(BufferGroup.class);
private String groupName;
private Span[] dataBuffer = new Span[BUFFER_MAX_SIZE];
AtomicInteger index = new AtomicInteger(0);
AtomicRangeInteger index = new AtomicRangeInteger(0, BUFFER_MAX_SIZE);
public BufferGroup(String groupName) {
this.groupName = groupName;
......@@ -38,7 +37,7 @@ public class BufferGroup {
}
public void save(Span span) {
int i = Math.abs(index.getAndIncrement() % BUFFER_MAX_SIZE);
int i = index.getAndIncrement();
if (dataBuffer[i] != null) {
logger.warn(
"Group[{}] index[{}] data collision, discard old data.",
......
package com.ai.cloud.skywalking.util;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 原子性,带范围性自增的整数
*
* @author wusheng
*
*/
public class AtomicRangeInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = -4099792402691141643L;
private AtomicInteger value;
private int startValue;
private int endValue;
/**
* Creates a new AtomicInteger with the given initial value and max value
*
* @param initialValue
* the initial value
*/
public AtomicRangeInteger(int startValue, int endValue) {
value = new AtomicInteger(startValue);
this.startValue = startValue;
this.endValue = endValue;
}
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
for (;;) {
int current = get();
int next = current + 1;
if (next >= this.endValue) {
next = this.startValue;
}
if (value.compareAndSet(current, next))
return current;
}
}
public final int get() {
return value.get();
}
public int intValue() {
return value.intValue();
}
public long longValue() {
return value.longValue();
}
public float floatValue() {
return value.floatValue();
}
public double doubleValue() {
return value.doubleValue();
}
}
package test.ai.cloud.skywalking.util;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import junit.framework.TestCase;
import com.ai.cloud.skywalking.util.AtomicRangeInteger;
public class AtomicRangeIntegerTest extends TestCase{
public void testGet(){
AtomicRangeInteger ari = new AtomicRangeInteger(0, 12);
for(int i = 0; i < 51; i++){
System.out.print(ari.getAndIncrement() + ";");
}
}
public void testMultiThreads(){
List<RangeIntegerThread> tlist = new ArrayList<RangeIntegerThread>();
for(int i = 0; i < 20; i++){
RangeIntegerThread t = new RangeIntegerThread();
tlist.add(t);
t.start();
}
for(int i = 0; i < tlist.size(); i++){
try {
tlist.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class RangeIntegerThread extends Thread{
private static String[] buffer = new String[500000000];
private static AtomicRangeInteger ari = new AtomicRangeInteger(0, buffer.length);
@Override
public void run(){
while(true){
int i = ari.getAndIncrement();
if(i % 10000000 == 0){
System.out.println(ari.get());
}
if(i >= buffer.length - 100000){
break;
}
Assert.assertNull(buffer[i]);
buffer[i] = "string";
}
}
}
package com.ai.cloud.skywalking.reciever.buffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.DATA_BUFFER_FILE_PARENT_DIRECTORY;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.DATA_CONFLICT_WAIT_TIME;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.DATA_FILE_MAX_LENGTH;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.FLUSH_NUMBER_OF_CACHE;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.MAX_WAIT_TIME;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.PER_THREAD_MAX_BUFFER_NUMBER;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.WRITE_DATA_FAILURE_RETRY_INTERVAL;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHealthCollector;
import com.ai.cloud.skywalking.reciever.selfexamination.ServerHeathReading;
import com.ai.cloud.skywalking.util.AtomicRangeInteger;
public class DataBufferThread extends Thread {
......@@ -21,7 +27,7 @@ public class DataBufferThread extends Thread {
private byte[][] data = new byte[PER_THREAD_MAX_BUFFER_NUMBER][];
private File file;
private FileOutputStream outputStream;
private AtomicInteger index = new AtomicInteger();
private AtomicRangeInteger index = new AtomicRangeInteger(0, PER_THREAD_MAX_BUFFER_NUMBER);
public DataBufferThread(int threadIdx) {
super("DataBufferThread_" + threadIdx);
......@@ -142,7 +148,7 @@ public class DataBufferThread extends Thread {
}
public void saveTemporarily(byte[] s) {
int i = Math.abs(index.getAndIncrement() % data.length);
int i = index.getAndIncrement();
while (data[i] != null) {
try {
ServerHealthCollector.getCurrentHeathReading(null).updateData(ServerHeathReading.WARNING, "DataBuffer index[" + i + "] data collision, service pausing. ");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册