最新公告
  • 欢迎您光临欧资源网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入我们
  • java线程系列之自己动手写一个线程池

    java线程系列之自己动手写一个线程池 最后编辑:2021-05-27
    增值服务: 自动发货 使用说明 安装指导 环境配置二次开发BUG修复

    mythreadpool

    机横屏看源码更方便)


    (1)自己动手写一个线程池需要考虑哪些因素?

    (2)自己动手写的线程池如何测试?

    线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池。

    线程池,顾名思义它首先是一个“池”,这个池里面放的是线程,线程是用来执行任务的。

    首先,线程池中的线程应该是有类别的,有的是核心线程,有的是非核心线程,所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。

    为什么要区分是否为核心线程呢?这是为了控制系统中线程的数量。

    当线程池中线程数未达到核心线程数coreSize时,来一个任务加一个线程是可以的,也可以提高任务执行的效率。

    当线程池中线程数达到核心线程数后,得控制一下线程的数量,来任务了先进队列,如果任务执行足够快,这些核心线程很快就能把队列中的任务执行完毕,完全没有新增线程的必要。

    当队列中任务也满了,这时候光靠核心线程就无法及时处理任务了,所以这时候就需要增加新的线程了,但是线程也不能无限制地增加,所以需要控制其最大线程数量maxSize。

    其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,我们一般使用BlockingQueue阻塞队列来充当,当然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞队列,不能运用在jdk的线程池中)。

    最后,当任务越来越多而线程处理却不及时,迟早会达到一种状态,队列满了,线程数也达到最大线程数了,这时候怎么办呢?这时候就需要走拒绝策略了,也就是这些无法及时处理的任务怎么办的一种策略,常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。

    根据上面的描述,我们定义一个线程池一共需要这么四个变量:核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

    另外,为了便于给线程池一个名称,我们再加一个变量:线程池的名称name。

    所以我们得出了线程池的属性及构造方法大概如下:

    public class MyThreadPoolExecutor implements Executor {
        /**
         * 线程池的名称
         */
        private String name;
        /**
         * 核心线程数
         */
        private int coreSize;
        /**
         * 最大线程数
         */
        private int maxSize;
        /**
         * 任务队列
         */
        private BlockingQueue<runnable> taskQueue;
        /**
         * 拒绝策略
         */
        private RejectPolicy rejectPolicy;
    
        public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<runnable> taskQueue, RejectPolicy rejectPolicy) {
            this.name = name;
            this.coreSize = coreSize;
            this.maxSize = maxSize;
            this.taskQueue = taskQueue;
            this.rejectPolicy = rejectPolicy;
        }
    }
    

    根据上面的属性分析,基本上我们已经得到了任务流向的完整逻辑:

    首先,如果运行的线程数小于核心线程数,直接创建一个新的核心线程来运行新的任务。

    其次,如果运行的线程数达到了核心线程数,则把新任务入队列。

    然后,如果队列也满了,则创建新的非核心线程来运行新的任务。

    最后,如果非核心线程数也达到最大了,那就执行拒绝策略。

    mythreadpool

    代码逻辑大致如下:

        @Override
        public void execute(Runnable task) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 如果正在运行的线程数小于核心线程数,直接加一个线程
            if (count < coreSize) {
                // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
                if (addWorker(task, true)) {
                    return;
                }
                // 如果添加核心线程失败,进入下面的逻辑
            }
    
            // 如果达到了核心线程数,先尝试让任务入队
            // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
            if (taskQueue.offer(task)) {
                // do nothing,为了逻辑清晰这里留个空if
                // 【本篇文章由公众号“彤哥读源码”原创】
            } else {
                // 如果入队失败,说明队列满了,那就添加一个非核心线程
                if (!addWorker(task, false)) {
                    // 如果添加非核心线程失败了,那就执行拒绝策略
                    rejectPolicy.reject(task, this);
                }
            }
        }
    

    首先,创建线程的依据是正在运行的线程数量有没有达到核心线程数或者最大线程数,所以我们还需要一个变量runningCount用来记录正在运行的线程数。

    其次,这个变量runningCount需要在并发环境下加加减减,所以这里需要使用到Unsafe的CAS指令来控制其值的修改,用了CAS就要给这个变量加上volatile修饰,为了方便我们这里直接使用AtomicInteger来作为这个变量的类型。

    然后,因为是并发环境中,所以需要判断runningCount < coreSize(或maxSize)(条件一)的同时修改runningCount CAS加一(条件二)成功了才表示可以增加一个线程,如果条件一失败则表示不能再增加线程了直接返回false,如果条件二失败则表示其它线程先修改了runningCount的值,则重试。

    最后,创建一个线程并运行新任务,且不断从队列中拿任务来运行【本篇文章由公众号“彤哥读源码”原创】。

    mythreadpool

    代码逻辑如下:

        private boolean addWorker(Runnable newTask, boolean core) {
            // 自旋判断是不是真的可以创建一个线程
            for (; ; ) {
                // 正在运行的线程数
                int count = runningCount.get();
                // 核心线程还是非核心线程
                int max = core ? coreSize : maxSize;
                // 不满足创建线程的条件,直接返回false
                if (count >= max) {
                    return false;
                }
                // 修改runningCount成功,可以创建线程
                if (runningCount.compareAndSet(count, count + 1)) {
                    // 线程的名字
                    String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                    // 创建线程并启动
                    new Thread(() -> {
                        System.out.println("thread name: " + Thread.currentThread().getName());
                        // 运行的任务
                        Runnable task = newTask;
                        // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                        while (task != null || (task = getTask()) != null) {
                            try {
                                // 执行任务
                                task.run();
                            } finally {
                                // 任务执行完成,置为空
                                task = null;
                            }
                        }
                    }, threadName).start();
    
                    break;
                }
            }
    
            return true;
        }
    

    从队列中取任务应该使用take()方法,这个方法会一直阻塞直至取到任务或者中断,如果中断了就返回null,这样当前线程也就可以安静地结束了,另外还要注意中断了记得把runningCount减一。

        private Runnable getTask() {
            try {
                // take()方法会一直阻塞直到取到任务为止
                return taskQueue.take();
            } catch (InterruptedException e) {
                // 线程中断了,返回null可以结束当前线程
                // 当前线程都要结束了,理应要把runningCount的数量减一
                runningCount.decrementAndGet();
                return null;
            }
        }
    

    好了,到这里我们自己的线程池就写完了,下面我们一起来想想怎么测试呢?

    我们再来回顾下自己的写的线程池的构造方法:

        public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<runnable> taskQueue, RejectPolicy rejectPolicy) {
            this.name = name;
            this.coreSize = coreSize;
            this.maxSize = maxSize;
            this.taskQueue = taskQueue;
            this.rejectPolicy = rejectPolicy;
        }
    

    name,这个随便传;

    coreSize,我们假设为5;

    maxSize,我们假设为10;

    taskQueue,任务队列,既然我们设置的是有边界的,我们就用最简单的ArrayBlockingQueue好吧,容量设置为15,这样里面最多可以存储15条任务;

    rejectPolicy,拒绝策略,我们假设使用丢弃当前任务的策略,OK,我们来实现一个。

    /**
     * 丢弃当前任务
     */
    public class DiscardRejectPolicy implements RejectPolicy {
        @Override
        public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
            // do nothing
            System.out.println("discard one task");
        }
    }
    

    OK,这样一个线程池就创建完成了,下面就是执行任务了,我们假设通过for循环连续不断地添加100个任务好不好。

    public class MyThreadPoolExecutorTest {
        public static void main(String[] args) {
            Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
            AtomicInteger num = new AtomicInteger(0);
    
            for (int i = 0; i < 100; i++) {
                threadPool.execute(()->{
                    try {
                        Thread.sleep(1000);
                        System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
    

    我们分析下这段程序:

    (1)先连续创建了5个核心线程,并执行了新任务;

    (2)后面的15个任务进了队列;

    (3)队列满了,又连续创建了5个线程,并执行了新任务;

    (4)后面的任务就没得执行了,全部走了丢弃策略;

    (5)所以真正执行成功的任务应该是 5 + 15 + 5 = 25 条任务;

    运行之:

    thread name: core_test2
    thread name: core_test5
    thread name: core_test3
    thread name: core_test4
    thread name: core_test1
    thread name: test6
    thread name: test7
    thread name: test8
    thread name: test9
    discard one task
    thread name: test10
    discard one task
    ...省略被拒绝的任务
    【本篇文章由公众号“彤哥读源码”原创】
    discard one task
    running: 1570546871851: 2
    running: 1570546871851: 8
    running: 1570546871851: 7
    running: 1570546871851: 6
    running: 1570546871851: 5
    running: 1570546871851: 3
    running: 1570546871851: 4
    running: 1570546871851: 1
    running: 1570546871851: 10
    running: 1570546871851: 9
    running: 1570546872852: 14
    running: 1570546872852: 20
    running: 1570546872852: 19
    running: 1570546872852: 17
    running: 1570546872852: 18
    running: 1570546872852: 16
    running: 1570546872852: 15
    running: 1570546872852: 12
    running: 1570546872852: 13
    running: 1570546872852: 11
    running: 1570546873852: 21
    running: 1570546873852: 24
    running: 1570546873852: 23
    running: 1570546873852: 25
    running: 1570546873852: 22
    

    可以看到,创建了5个核心线程、5个非核心线程,成功执行了25条任务,完成没问题,完美^^。

    (1)自己动手写一个线程池需要考虑的因素主要有:核心线程数、最大线程数、任务队列、拒绝策略。

    (2)创建线程的时候要时刻警惕并发的陷阱;

    我们知道,jdk自带的线程池还有两个参数:keepAliveTime、unit,它们是干什么的呢?

    答:它们是用来控制何时销毁非核心线程的,当然也可以销毁核心线程,具体的分析请期待下一章吧。

    public interface Executor {
        void execute(Runnable command);
    }
    
    /**
     * 自动动手写一个线程池
     */
    public class MyThreadPoolExecutor implements Executor {
    
        /**
         * 线程池的名称
         */
        private String name;
        /**
         * 线程序列号
         */
        private AtomicInteger sequence = new AtomicInteger(0);
        /**
         * 核心线程数
         */
        private int coreSize;
        /**
         * 最大线程数
         */
        private int maxSize;
        /**
         * 任务队列
         */
        private BlockingQueue<runnable> taskQueue;
        /**
         * 拒绝策略
         */
        private RejectPolicy rejectPolicy;
        /**
         * 当前正在运行的线程数【本篇文章由公众号“彤哥读源码”原创】
         * 需要修改时线程间立即感知,所以使用AtomicInteger
         * 或者也可以使用volatile并结合Unsafe做CAS操作(参考Unsafe篇章讲解)
         */
        private AtomicInteger runningCount = new AtomicInteger(0);
    
        public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<runnable> taskQueue, RejectPolicy rejectPolicy) {
            this.name = name;
            this.coreSize = coreSize;
            this.maxSize = maxSize;
            this.taskQueue = taskQueue;
            this.rejectPolicy = rejectPolicy;
        }
    
        @Override
        public void execute(Runnable task) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 如果正在运行的线程数小于核心线程数,直接加一个线程
            if (count < coreSize) {
                // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
                if (addWorker(task, true)) {
                    return;
                }
                // 如果添加核心线程失败,进入下面的逻辑
            }
    
            // 如果达到了核心线程数,先尝试让任务入队
            // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
            if (taskQueue.offer(task)) {
                // do nothing,为了逻辑清晰这里留个空if
            } else {
                // 如果入队失败,说明队列满了,那就添加一个非核心线程
                if (!addWorker(task, false)) {
                    // 如果添加非核心线程失败了,那就执行拒绝策略
                    rejectPolicy.reject(task, this);
                }
            }
        }
    
        private boolean addWorker(Runnable newTask, boolean core) {
            // 自旋判断是不是真的可以创建一个线程
            for (; ; ) {
                // 正在运行的线程数
                int count = runningCount.get();
                // 核心线程还是非核心线程
                int max = core ? coreSize : maxSize;
                // 不满足创建线程的条件,直接返回false
                if (count >= max) {
                    return false;
                }
                // 修改runningCount成功,可以创建线程
                if (runningCount.compareAndSet(count, count + 1)) {
                    // 线程的名字
                    String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                    // 创建线程并启动
                    new Thread(() -> {
                        System.out.println("thread name: " + Thread.currentThread().getName());
                        // 运行的任务【本篇文章由公众号“彤哥读源码”原创】
                        Runnable task = newTask;
                        // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                        while (task != null || (task = getTask()) != null) {
                            try {
                                // 执行任务
                                task.run();
                            } finally {
                                // 任务执行完成,置为空
                                task = null;
                            }
                        }
                    }, threadName).start();
    
                    break;
                }
            }
    
            return true;
        }
    
        private Runnable getTask() {
            try {
                // take()方法会一直阻塞直到取到任务为止
                return taskQueue.take();
            } catch (InterruptedException e) {
                // 线程中断了,返回null可以结束当前线程
                // 当前线程都要结束了,理应要把runningCount的数量减一
                runningCount.decrementAndGet();
                return null;
            }
        }
    
    }
    
    public interface RejectPolicy {
        void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
    }
    
    /**
     * 丢弃当前任务
     */
    public class DiscardRejectPolicy implements RejectPolicy {
        @Override
        public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
            // do nothing
            System.out.println("discard one task");
        }
    }
    
    public class MyThreadPoolExecutorTest {
        public static void main(String[] args) {
            Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
            AtomicInteger num = new AtomicInteger(0);
    
            for (int i = 0; i < 100; i++) {
                threadPool.execute(()->{
                    try {
                        Thread.sleep(1000);
                        System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
    
        }
    }

    猜你在找

    站内大部分资源收集于网络,若侵犯了您的合法权益,请联系我们删除!
    欧资源网 » java线程系列之自己动手写一个线程池

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    欧资源网
    一个高级程序员模板开发平台
    • 2020-02-15Hi,初次和大家见面了,请多关照!

    发表评论

    售后服务:

    • 售后服务范围 1、商业模板使用范围内问题免费咨询
      2、源码安装、模板安装(一般 ¥50-300)服务答疑仅限SVIP用户
      3、单价超过200元的模板免费一次安装,需提供服务器信息。
      付费增值服务 1、提供dedecms模板、WordPress主题、discuz模板优化等服务请详询在线客服
      2、承接 WordPress、DedeCMS、Discuz 等系统建站、仿站、开发、定制等服务
      3、服务器环境配置(一般 ¥50-300)
      4、网站中毒处理(需额外付费,500元/次/质保三个月)
      售后服务时间 周一至周日(法定节假日除外) 9:00-23:00
      免责声明 本站所提供的模板(主题/插件)等资源仅供学习交流,若使用商业用途,请购买正版授权,否则产生的一切后果将由下载用户自行承担,有部分资源为网上收集或仿制而来,若模板侵犯了您的合法权益,请来信通知我们(Email: 599398545@qq.com),我们会及时删除,给您带来的不便,我们深表歉意!

    Hi, 如果你对这款模板有疑问,可以跟我联系哦!

    联系作者