手写线程池
大约 3 分钟Juc
如果不使用线程池而是每一次都创建一个线程的问题:
- 线程上下文切换成本高
- cpu核数有限,资源有限
线程池组件
- 线程池
- 阻塞队列(线程空闲时进行等待,任务过多时进行阻塞等待,平衡生产者线程和消费者线程)
阻塞队列的实现(需要准备一个队列(容量),锁,条件变量用来存放阻塞的线程,实现添加(生产者入队)和获取方法(消费者出队列))
class BlockingQueue<T>{
private ArrayDeque<T> queue;
private ReentrantLock lock;
private Condition fullWaitSet=lock.newCondition();
private Condition emptyWaitSet=lock.newCondition();
private int capacity;
public void offer(T element){
//对于队列尾部共享资源进行加锁添加
lock.tryLock();
try{
//如果队列已经满了,就需要进行阻塞等待
while(queue.size()==capacity){
fullWaitSet.await();
}
//把元素添加到队列尾部
queue.addLast(element);
//唤醒空闲线程进行获取资源
emptyWaitSet.signal();
}catch(Exception e){
e.print();
}finally{
lock.unlock();
}
}
//超时等待任务(类似于tryLock()方法,利用Condition条件队列的awaitNanos(long nanos)方法实现超时等待)
public T tryPoll(long timeout,TimeUnit unit){
//对于队首元素进行阻塞获取
lock.lock();
try{
//把要等待的时间变成毫秒
long nanos=unit.toNanos(timeout);
while(queue.isEmpty()){
//awaitNanos()方法的返回值是剩余的等待时间
nanos=emptyWaitSet.awaitNanos(nanos);
//如果超过了等待时间就返回空
if(nanos<0){
return null;
}
}
//如果队列不为空就获取
T t=queue.removeFirst();
//唤醒生产者线程
fullWaitSet.signal();
return t;s
}catch(){
}finally{
lock.unlock();
}
}
//阻塞获取任务(类似于lock()方法)
public T poll(){
//对于队首共享资源进行加锁获取
lock.tryLock();
try{
//如果队列是空就进行阻塞等待
while(queue.isEmpty()){
emptyWaitSet.await();
}
//获取队首元素
T t=queue.removeFirst();
//唤醒生产者线程进行队列添加
fullWaitSet.signal();
return t;
}catch(Exception e){
e.print();
}finally{
lock.unlock();
}
}
public int size(){
//对于共享资源队列进行加锁获取大小
lock.lock();
try{
return queue.size();
}catch(){
}finally{
lock.unlock();
}
}
}线程池的实现(阻塞队列,线程池,核心线程数,阻塞获取时间,阻塞获取时间单位,执行任务方法,Worker的run方法)
class ThreadPool{
private BlockingQueue<T> blockingQueue;
//线程池,Worker类对线程进行封装
private HashSet<Worker> workers;
int coreSize;
long timeout;
TimeUnit unit;
//构造函数
public ThreadPool(int coresize){
workers=new HashSet<Worker>(coresize);
}
//执行任务逻辑
public void execute(Runnable task){
//由于workers是共享资源,所以要加锁
synchornized(workers){
//如果workers集合还没有满,就直接创建一个Worker执行,并把这个Worker加入集合
if(workers.size()<coreSize){
Worker worker=new Worker(task);
workers.add(worker);
worker.start();
}
//如果workers集合已经满了,就加入阻塞对了进行等待
queue.offer(task);
}
}
class Worker extends Thread{
private Runnable task;
//构造函数
public Worker(Runnable task){
this.task=task;
}
//进行工作
@Override
public void run(){
//如果当前有任务,或者阻塞队列不为空,就进行工作
while(task!=null||!queue.isEmpty()){
try{
//当前有任务的情况
if(task!=null){
task.run();
}
//当前没有任务,但阻塞队列不为空
task=queue.poll();
}
}catch(){
}finally{
//执行完成之后需要释放这个任务
task=null;
}
//否则就回收移除这个Worker对象
workers.remove(this);
worker=null;
}
}
}拒绝策略