提交 8faa3564 编写于 作者: D Daming 提交者: wu-sheng

Using ArrayList to instead of LinkedList (#3093)

* Using ArrayList to instead of LinkedList
上级 f84e1de5
......@@ -18,10 +18,12 @@
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import java.util.LinkedList;
import java.util.List;
/**
* Created by wusheng on 2016/10/25.
*/
......@@ -79,19 +81,17 @@ public class Buffer<T> {
return buffer.length;
}
public LinkedList<T> obtain() {
return this.obtain(0, buffer.length);
public void obtain(List<T> consumeList) {
this.obtain(consumeList, 0, buffer.length);
}
public LinkedList<T> obtain(int start, int end) {
LinkedList<T> result = new LinkedList<T>();
public void obtain(List<T> consumeList, int start, int end) {
for (int i = start; i < end; i++) {
if (buffer[i] != null) {
result.add((T)buffer[i]);
consumeList.add((T)buffer[i]);
buffer[i] = null;
}
}
return result;
}
}
......@@ -19,10 +19,11 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import java.util.ArrayList;
import java.util.List;
/**
* Created by wusheng on 2016/10/25.
*/
......@@ -36,7 +37,7 @@ public class ConsumerThread<T> extends Thread {
super(threadName);
this.consumer = consumer;
running = false;
dataSources = new LinkedList<DataSource>();
dataSources = new ArrayList<DataSource>(1);
this.consumeCycle = consumeCycle;
}
......@@ -64,10 +65,9 @@ public class ConsumerThread<T> extends Thread {
public void run() {
running = true;
final List<T> consumeList = new ArrayList<T>(1500);
while (running) {
boolean hasData = consume();
if (!hasData) {
if (!consume(consumeList)) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
......@@ -77,31 +77,27 @@ public class ConsumerThread<T> extends Thread {
// consumer thread is going to stop
// consume the last time
consume();
consume(consumeList);
consumer.onExit();
}
private boolean consume() {
boolean hasData = false;
LinkedList<T> consumeList = new LinkedList<T>();
private boolean consume(List<T> consumeList) {
for (DataSource dataSource : dataSources) {
LinkedList<T> data = dataSource.obtain();
if (data.size() == 0) {
continue;
}
consumeList.addAll(data);
hasData = true;
dataSource.obtain(consumeList);
}
if (consumeList.size() > 0) {
if (!consumeList.isEmpty()) {
try {
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
} finally {
consumeList.clear();
}
return true;
}
return hasData;
return false;
}
void shutdown() {
......@@ -122,8 +118,8 @@ public class ConsumerThread<T> extends Thread {
this.end = end;
}
LinkedList<T> obtain() {
return sourceBuffer.obtain(start, end);
void obtain(List<T> consumeList) {
sourceBuffer.obtain(consumeList, start, end);
}
}
}
......@@ -18,8 +18,11 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import java.util.ArrayList;
import java.util.List;
/**
* MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
......@@ -43,10 +46,12 @@ public class MultipleChannelsConsumer extends Thread {
public void run() {
running = true;
final List consumeList = new ArrayList(2000);
while (running) {
boolean hasData = false;
for (Group target : consumeTargets) {
hasData = hasData || consume(target);
boolean consume = consume(target, consumeList);
hasData = hasData || consume;
}
if (!hasData) {
......@@ -55,34 +60,34 @@ public class MultipleChannelsConsumer extends Thread {
} catch (InterruptedException e) {
}
}
}
// consumer thread is going to stop
// consume the last time
for (Group target : consumeTargets) {
consume(target);
consume(target, consumeList);
target.consumer.onExit();
}
}
private boolean consume(Group target) {
boolean hasData;
LinkedList consumeList = new LinkedList();
private boolean consume(Group target, List consumeList) {
for (int i = 0; i < target.channels.getChannelSize(); i++) {
Buffer buffer = target.channels.getBuffer(i);
consumeList.addAll(buffer.obtain());
buffer.obtain(consumeList);
}
if (hasData = consumeList.size() > 0) {
if (!consumeList.isEmpty()) {
try {
target.consumer.consume(consumeList);
} catch (Throwable t) {
target.consumer.onError(consumeList, t);
} finally {
consumeList.clear();
}
return true;
}
return hasData;
return false;
}
/**
......
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.commons.datacarrier;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
......@@ -65,13 +66,15 @@ public class DataCarrierTest {
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
List result1 = buffer1.obtain(0, 100);
List result = new ArrayList();
buffer1.obtain(result, 0, 100);
Assert.assertEquals(2, result.size());
Buffer<SampleData> buffer2 = channels.getBuffer(1);
List result2 = buffer2.obtain(0, 100);
buffer2.obtain(result, 0, 100);
Assert.assertEquals(2, result1.size());
Assert.assertEquals(4, result1.size() + result2.size());
Assert.assertEquals(4, result.size());
}
......@@ -86,11 +89,12 @@ public class DataCarrierTest {
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
List result1 = buffer1.obtain(0, 100);
List result = new ArrayList();
buffer1.obtain(result, 0, 100);
Buffer<SampleData> buffer2 = channels.getBuffer(1);
List result2 = buffer2.obtain(0, 100);
Assert.assertEquals(200, result1.size() + result2.size());
buffer2.obtain(result, 0, 100);
Assert.assertEquals(200, result.size());
}
@Test
......@@ -108,11 +112,12 @@ public class DataCarrierTest {
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
List result1 = buffer1.obtain(0, 100);
List result = new ArrayList();
buffer1.obtain(result, 0, 100);
Buffer<SampleData> buffer2 = channels.getBuffer(1);
List result2 = buffer2.obtain(0, 100);
Assert.assertEquals(200, result1.size() + result2.size());
buffer2.obtain(result, 0, 100);
Assert.assertEquals(200, result.size());
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册