博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java线程池框架源代码分析
阅读量:5337 次
发布时间:2019-06-15

本文共 8885 字,大约阅读时间需要 29 分钟。

相关类Executor,Executors。AbstractExecutorService。ExecutorService

Executor:整个线程池运行者框架的顶层接口。

定义了一个execute方法。整个线程运行者框架的核心方法。

public interface Executor {    void execute(Runnable command);}
ExecutorService:这是一个接口它继承自Executor,定义了shutdown。shutdownNow,awaitTermination,submit。invokeAll等方法。

AbstractExecutorService:实现了ExecutorService接口中的submit,invokeAll的方法。

public Future

> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }

      在这里。全部submit方法提交的任务终于还是调用了execute方法。execute是接口Executor中定义的方法,AbstractExecutorService没有实现它,

须要子类去实现这种方法。ThreadPoolExecutor继承了AbstractExecutorService,它实现了execute方法。ScheduledThreadPoolExecutor继承自

ThreadPoolExecutor,并覆盖了ThreadPoolExecutor的execute方法。这种方法是线程运行框者架的核心逻辑,不同的线程池运行者有不同的实现逻辑。

     AbstractExecutorService的功能较为简单。实现了不同參数的submit。invokeAll方法。

ThreadPoolExecutor线程池运行者:它有一个核心的成员变量:

        private final HashSet<Worker> workers = new HashSet<Worker>();

    workers能够看做是ThreadPoolExecutor中用于执行任务的线程池。

    worker是一个封装了一个Thread对象并实现了Runnable接口的类。

封装Thread非常easy理解,由于它要利用Thread去执行execute方法提交过来的runnable任务。

可是为什么会继承runnable接口呢?

以下是剔除了部分代码的Worker源代码:

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {    	 final Thread thread;                Runnable firstTask;        Worker(Runnable firstTask) {            setState(-1);             this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        public void run() {            runWorker(this);        }    }
Worker是ThreadPoolExecutor的一个内部类,Worker本身实现了Runnable接口,并封装了一个Thread对象,最后在构造方法中获取了一个Runnable对象,这个对象就是ThreadPoolExecutor通过execute提交过来的目标任务。

跟踪runWorker(this)方法:

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock();         boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();//在这里直接调用了目标任务的run方法,并没有将它传给Thread对象。

} catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

回过头来在看看Worker的构造方法:

Worker(Runnable firstTask) {        setState(-1);         this.firstTask = firstTask;        this.thread = getThreadFactory().newThread(this);    }
它将自己传给了自己的成员变量thread。

目标任务被运行的步骤可能就是:Worker的成员变量thread启动后调用worker的run方法。worker的run方法中将自己传给runWorker,runWorker在调用目标运行对象的run方法。

那么thread是何时被运行的呢?

以下看看ThreadPoolExecutor中的一个其它方法:

private boolean addWorker(Runnable firstTask, boolean core) {       ......        try {            final ReentrantLock mainLock = this.mainLock;            w = new Worker(firstTask);            final Thread t = w.thread;//这里初始化一个Worker对象w。在将w的成员变量thread付给t            if (t != null) {                mainLock.lock();                try {                    int c = ctl.get();                    int rs = runStateOf(c);                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive())                             throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();//在这里调用t的start方法。                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }
这里为什么会设计的这么绕,我想主要是Worker不仅封装了一个thread,并且对目标任务进行了封装。在执行封装过后的目标任务前,addWorker能够做一些相关操作。

这里只介绍了ThreadPoolExecutor的线程池。那么这个线程池是怎样被维护的。以下介绍几个关键的參数。

private volatile int corePoolSize;  private volatile int maximumPoolSize;  private final BlockingQueue
workQueue;
    这三个是ThreadPoolExecutor的成员变量,当中workQueue跟县城池没有关系。

workQueue是一个线程安全的堵塞队列。

    corePoolSize是线程池的核心大小。maximumPoolSize是线程池的最大大小。

    当提交新任务时,假设ThreadPoolExecutor中有线程在执行。而且线程的数量小于corePoolSize,那么就会有新的线程被创建。

假设当前执行的线程数大于corePoolSize,就会放到缓存队列workQueue中。假设缓冲队列也满了。就继续创建线程,直到线程的数量达到maximumPoolSize

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        //推断假设当前执行的线程数小于 corePoolSize,加入新的线程(addWorker会加入一个新的线程。上面有介绍),方法直接返回。        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        //假设当前的执行的线程数大于或等于corePoolSize则新的任务会放到缓存队列中。        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))        //最后缓冲队列加入失败,则会继续加入线程。

假设加入新的线程失败,则拒绝这个任务。 reject(command); }

还有些其它的參数:

private volatile ThreadFactory threadFactory //线程的工厂函数。

private volatile RejectedExecutionHandler handler;//任务拒绝的处理类。 private volatile long keepAliveTime;//任务等待的是将。

ThreadPoolExecutor有几个构造方法来初始化这些參数。Executors类将这些參数简化了来获得一个ExecutorService的引用。

public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue
()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
(), threadFactory); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue
()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue
(), threadFactory); }

     这四个方法中前两个的核心线程数和最大线程数同样,全部可执行的线程数是固定的,<=nThreads。

当任务数大于nThreads时,就是放入缓冲队列中。  后两个方法中,线程数是无边界的,核心线程数是0,最大线程数是整型的最大值,然后假设有线程60秒内没有任务执行的话就销毁。每次有新的任务来,都会创建新的线程或使用曾经创建的线程(60秒内没有任务执行的线程)。

你可能有疑问。既然核心线程数是0,那么全部的任务不是都放到队里里了吗?那么如今就来看看SynchronousQueue这个队里,能够看看这里的介绍http://wsmajunfeng.iteye.com/blog/1629352/。

    回过头来看看任务提交方法的源代码:

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {//这里是在往队列里方任务,假设不成功就会加入Worker(封装了线程对象)            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }
      上面链接里的博客提到:offer()往queue里放一个element后马上返回,假设碰巧这个element被还有一个thread取走了,offer方法返回true。觉得offer成功;否则返回false。
试想一下,第一次提交任务的时候,核心线程数为0,此时没有线程所以没有线程从workQueue中取东西,所以这里的workQueue.offer(command)会返回false,那么就会通过addWorker(command, false)创建一个新的线程。

转载于:https://www.cnblogs.com/gcczhongduan/p/5059023.html

你可能感兴趣的文章
linux的子进程调用exec( )系列函数
查看>>
MSChart的研究
查看>>
C# 索引器
查看>>
MySQLdb & pymsql
查看>>
zju 2744 回文字符 hdu 1544
查看>>
delphi 内嵌汇编例子
查看>>
【luogu P2298 Mzc和男家丁的游戏】 题解
查看>>
前端笔记-bom
查看>>
MATLAB作图方法与技巧(一)
查看>>
上海淮海中路上苹果旗舰店门口欲砸一台IMAC电脑维权
查看>>
Google透露Android Market恶意程序扫描服务
查看>>
给mysql数据库字段值拼接前缀或后缀。 concat()函数
查看>>
迷宫问题
查看>>
【FZSZ2017暑假提高组Day9】猜数游戏(number)
查看>>
泛型子类_属性类型_重写方法类型
查看>>
eclipse-将同一个文件分屏显示
查看>>
对闭包的理解
查看>>
练习10-1 使用递归函数计算1到n之和(10 分
查看>>
Oracle MySQL yaSSL 不明细节缓冲区溢出漏洞2
查看>>
windows编程ASCII问题
查看>>