提交 9878c120 编写于 作者: 冰 河's avatar 冰 河

提交自定义线程池源码

上级 6018f9eb
......@@ -16,7 +16,8 @@
package io.binghe.concurrent.policy;
import io.binghe.concurrent.queue.BlockingQueue;
import io.binghe.concurrent.queue.CustomQueue;
import io.binghe.concurrent.threadpool.CustomThreadPoolExecutor;
/**
* @author binghe(公众号:冰河技术)
......@@ -24,10 +25,10 @@ import io.binghe.concurrent.queue.BlockingQueue;
* @description 拒绝策略接口
*/
@FunctionalInterface
public interface RejectHandler<T> {
public interface RejectHandler {
/**
* 拒绝任务回调接口
*/
void reject(BlockingQueue<T> queue, T task);
void reject(Runnable r, CustomThreadPoolExecutor executor);
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.concurrent.queue;
import io.binghe.concurrent.policy.RejectHandler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 阻塞队列实现类
*/
public class CustomBlockingQueue<T> implements CustomQueue<T> {
//ArrayBlockingQueue 队列
private BlockingQueue<T> queue;
//ReentrantLock锁
private ReentrantLock lock = new ReentrantLock();
//消费者可以消费
private Condition notEmpty = lock.newCondition();
//生产者可以生产
private Condition notFull = lock.newCondition();
//队列容量
private int capcity;
//拒绝策略
private RejectHandler rejectHandler;
public CustomBlockingQueue(int capcity){
if (capcity <= 0){
throw new IllegalArgumentException();
}
this.capcity = capcity;
this.queue = new ArrayBlockingQueue<T>(capcity);
}
@Override
public T poll(long timeout, TimeUnit timeUnit) {
lock.lock();
try{
//转化为纳秒
long nanos = timeUnit.toNanos(timeout);
//是否为空
while (queue.isEmpty()){
if (nanos <= 0){
return null;
}
nanos = notEmpty.awaitNanos(nanos);
}
T t = queue.remove();
notFull.signal();
return t;
}catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}
@Override
public T take() {
lock.lock();
try{
while (queue.isEmpty()){
notEmpty.await();
}
T t = queue.remove();
notFull.signal();
return t;
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return null;
}
@Override
public void put(T task) {
lock.lock();
try{
while (queue.size() == capcity){
notFull.await();
}
queue.add(task);
notEmpty.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
@Override
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try{
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity){
if (nanos <= 0){
return false;
}
nanos = notFull.awaitNanos(nanos);
}
queue.add(task);
notEmpty.signal();
return true;
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
return false;
}
@Override
public int size() {
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
@Override
public int capcity() {
return capcity;
}
}
......@@ -20,9 +20,9 @@ import java.util.concurrent.TimeUnit;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 自定义阻塞队列接口
* @description 队列接口
*/
public interface BlockingQueue<T> {
public interface CustomQueue<T> {
/**
* 超时获取并移除队列数据
......@@ -43,11 +43,15 @@ public interface BlockingQueue<T> {
/**
* 向队列中超时添加元素
*/
boolean offer(T task, long time, TimeUnit timeUnit);
boolean offer(T task, long timeout, TimeUnit timeUnit);
/**
* 返回队列中元素的个数
*/
int size();
/**
* 对列容量
*/
int capcity();
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.concurrent.test;
import io.binghe.concurrent.queue.CustomBlockingQueue;
import io.binghe.concurrent.threadpool.CustomThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 测试自定义线程池
*/
public class CustomThreadPoolExecutorTest {
public static void main(String[] args){
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(3, 6, 5, TimeUnit.SECONDS, new CustomBlockingQueue<>(8));
IntStream.rangeClosed(1, 100).forEach((i) -> {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName() + "===>>> 正在执行任务");
});
});
}
}
......@@ -12,14 +12,18 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @author binghe(公众号 : 冰河技术)
* @description
* @version 1.0.0
*/
package io.binghe.concurrent.threadpool;
/**
* @author binghe(公众号 : 冰河技术)
* @description
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 自定义线程池接口
*/
package io.binghe.concurrent;
\ No newline at end of file
public interface CustomExecutor {
/**
* 执行任务
*/
void execute(Runnable task);
}
/**
* Copyright 2020-9999 the original author or authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.binghe.concurrent.threadpool;
import io.binghe.concurrent.policy.RejectHandler;
import io.binghe.concurrent.queue.CustomBlockingQueue;
import io.binghe.concurrent.queue.CustomQueue;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @author binghe(公众号:冰河技术)
* @version 1.0.0
* @description 自定义线程池
*/
public class CustomThreadPoolExecutor implements CustomExecutor {
//任务对列
private CustomQueue<Runnable> workQueue;
//线程集合
private Set<WorkerThread> workerThreads = new HashSet<>();
//线程核心数
private int coreSize;
//最大线程数
private int maximumSize;
//获取任务超时时间
private long timeout;
//获取任务超时时间的单位
private TimeUnit timeUnit;
//拒绝策略
private RejectHandler rejectHandler;
public CustomThreadPoolExecutor(int coreSize, int maximumSize, long timeout, TimeUnit timeUnit, CustomQueue<Runnable> workQueue) {
if (coreSize <= 0 || maximumSize <= 0 || maximumSize < coreSize){
throw new IllegalArgumentException();
}
this.coreSize = coreSize;
this.maximumSize = maximumSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectHandler = new AbortPolicy();
this.workQueue = workQueue;
}
public CustomThreadPoolExecutor(int coreSize, int maximumSize, long timeout, TimeUnit timeUnit, RejectHandler rejectHandler, CustomQueue<Runnable> workQueue) {
if (coreSize <= 0 || maximumSize <= 0 || maximumSize < coreSize){
throw new IllegalArgumentException();
}
this.coreSize = coreSize;
this.maximumSize = maximumSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectHandler = rejectHandler;
this.workQueue = workQueue;
}
@Override
public void execute(Runnable task) {
synchronized (workerThreads){
//获取任务数,也就是线程池中目前存在的线程数
int taskCount = workerThreads.size();
//1.任务数没有超过coreSize,创建workerThread执行
if (taskCount < coreSize){
createNewThread(task);
}else if (taskCount < maximumSize){ //2.任务数量大于等于coreSize,并且小于maximumSize
//目前对列还有容量,向队列中添加元素
if (workQueue.size() < workQueue.capcity()){ //2.1 队列不满,相队列中添加元素
workQueue.put(task);
}else{ //2.2 队列已满,继续创建新的线程执行任务
createNewThread(task);
}
}else{ //3.队列已满,同时达到最大线程数,则执行拒绝策略
rejectHandler.reject(task, this);
}
}
}
public CustomQueue<Runnable> getQueue() {
return workQueue;
}
private void createNewThread(Runnable task){
WorkerThread workerThread = new WorkerThread(task);
workerThreads.add(workerThread);
workerThread.start();
}
//用提交任务的线程执行任务
public static class CallerRunsPolicy implements RejectHandler{
@Override
public void reject(Runnable r, CustomThreadPoolExecutor e) {
r.run();
}
}
//直接抛出异常
public static class AbortPolicy implements RejectHandler{
@Override
public void reject(Runnable r, CustomThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
//忽略任务
public static class DiscardPolicy implements RejectHandler {
@Override
public void reject(Runnable r, CustomThreadPoolExecutor e) {
}
}
//移除队列中存放最久的任务
public static class DiscardOldestPolicy implements RejectHandler{
@Override
public void reject(Runnable r, CustomThreadPoolExecutor e) {
e.getQueue().take();
e.execute(r);
}
}
class WorkerThread extends Thread{
private Runnable task;
public WorkerThread(Runnable task){
this.task = task;
}
@Override
public void run() {
while (task != null || (task = workQueue.poll(timeout, timeUnit)) != null){
try{
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workerThreads){
workerThreads.remove(this);
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册