博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
追踪解析 FutureTask 源码
阅读量:6676 次
发布时间:2019-06-25

本文共 7249 字,大约阅读时间需要 24 分钟。

零 前期准备

0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

2 ThreadLocal 简介

FutureTask 是 jdk 中默认的 Future 实现类,常与 Callable 结合进行多线程并发操作。

3 Demo

import java.util.concurrent.*;public class FutureTaskDemo {    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {                //创建一个线程池        ExecutorService pool = Executors.newFixedThreadPool(1);        try{            //创建一个要执行的 Callable 对象            //此处其实 Runnable 对象也可以,但是通常不会那样做            Callable
task = () -> { //休眠三秒 TimeUnit.SECONDS.sleep(3); //返回一个字符串 return "hello"; }; //用 FutureTask 对象去包装 Callable FutureTask
futureTask = new FutureTask<>(task); //此处将 FutureTask 对象丢进线程池里 pool.submit(futureTask); //注意,此处的 futureTask 本质上是作为 Runnable 被丢进池子里的 //所以也可以用线程池的 execute(...) 方法 //pool.execute(futureTask) //还有一种更常见的执行方式是直接使用 Thread //new Thread(futureTask).start(); //获取结果 //注意,如果没有获取到的话此处会阻塞线程直到获取到为止 String result = futureTask.get(); //还有一种限时策略的结果获取 //超时的情况下会抛出异常 //String result = futureTask.get(1,TimeUnit.SECONDS); System.out.println(result); }finally { //关闭连接池 pool.shutdown(); } }}

一 FutureTask 的创建

回到 Demo 中的创建代码:

FutureTask
futureTask = new FutureTask<>(task);

追踪 FutureTask 的构造器:

//FutureTask.classpublic FutureTask(Callable
callable) { //有效性判断,不能为空 if (callable == null) throw new NullPointerException(); //记录下 callable 对象 this.callable = callable; //state 是一个 int 类型的对象,是一个 //NEW = 0 this.state = NEW;}

二 run

FutureTask 本身是 Runnable 的实现类,其在被 ThreadPoolExecutor 或者 Thread 对象消费的时候也是被当做 Runnable 的实现类的。

所以其本身的核心逻辑就必然在 run() 方法中:

//FutureTask.classpublic void run() {    //先判断状态,如果状态不是 NEW 就会直接返回    //RUNNER 是一个 VarHandler 类型的变量,指向了 FutureTask 中的 thread 变量,用于储存当前的线程    //但是如果 thread 已经不为 null,此处也会直接返回    //这两种返回条件都意味着此 FutureTask 的 run() 方法已经执行过了    if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))        return;    try {        //获取 callable        Callable
c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行 callable 的业务逻辑 result = c.call(); //ran 为成功标识 ran = true; } catch (Throwable ex) { //出错的情况下 result = null; ran = false; //不成功的情况下存入 exception setException(ex); } //如果成功的话会在此处进行操作 if (ran) set(result); } } finally { //置空 runner = null; int s = state; if (s >= INTERRUPTING) //如果此 FutreTask 的状态是中断状态,会在此处不断调用 Thread.yield() 空转 handlePossibleCancellationInterrupt(s); }}

此处有两个关键方法,即为 setException(...) 和 set(...):

//FutureTask.classprotected void setException(Throwable t) {    //用 CAS 操作比较并更新状态值    if (STATE.compareAndSet(this, NEW, COMPLETING)) {        //outcome 是一个 Object 对象,用于存储 callable 的返回值        //此处由于报错了,所以储存的是错误对象        outcome = t;        //EXCEPTIONAL = 3        STATE.setRelease(this, EXCEPTIONAL);        //最后清理工作,主要用于唤醒等待线程和执行 callable        finishCompletion();    }}//FutureTask.classprotected void set(V v) {    //基本逻辑和 setException(...) 方法雷同,只是 STATE 和 outcome 的储存值不同    if (STATE.compareAndSet(this, NEW, COMPLETING)) {        outcome = v;        STATE.setRelease(this, NORMAL);        finishCompletion();    }}

再来看 finishCompletion() 方法:

//FutureTask.classprivate void finishCompletion() {    //WaitNode 是 FutureTask 的静态内部类    //其本质上是单向链表的节点表示类,用于存放想要获取 Callable 的返回值但是被阻塞的线程的线程对象    for (WaitNode q; (q = waiters) != null;) {        //此处使用 CAS 将 q 从 WAITERS 里去除        if (WAITERS.weakCompareAndSet(this, q, null)) {            for (;;) {                Thread t = q.thread;                if (t != null) {                    //此处置空线程对象,帮助 GC                    q.thread = null;                    //唤醒线程                    LockSupport.unpark(t);                }                //接着往下遍历                WaitNode next = q.next;                if (next == null)                    break;                q.next = null;                 q = next;            }            break;        }    }    //此方法是空的    done();    //置空 callable    callable = null;}

之前提到过在 FutureTask 的 get(...) 方法中会阻塞线程,直到 Callable 执行完毕并能够获取返回值的时候才会结束阻塞。

所以 finishCompletion() 方法的主体其实就是去唤醒被阻塞的线程。

三 get

回到 Demo 中的创建代码:

String result = futureTask.get();

追踪 get() 方法:

//step 1//FutureTask.classpublic V get() throws InterruptedException, ExecutionException {    //此处先判断状态值,如果非 COMPLETING,即为还没完成,就会调用 awaitDone(...) 方法阻塞线程    int s = state;    if (s <= COMPLETING)        s = awaitDone(false, 0L);    //返回结果    return report(s);}//step 2//FutureTask.classprivate V report(int s) throws ExecutionException {    //获取需要返回的对象    Object x = outcome;    //如果是正常结束的就直接返回对象即可    if (s == NORMAL)        return (V)x;    //出错的情况下,抛异常    if (s >= CANCELLED)        throw new CancellationException();    throw new ExecutionException((Throwable)x);}

再来看一下阻塞线程的 awaitDone(...) 方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {        //循环的次数    long startTime = 0L;    //节点对象    WaitNode q = null;    //链表队列标识,代表该线程是否被加入链表中,初始为 false 代表未被加入    boolean queued = false;    for (;;) {        int s = state;        if (s > COMPLETING) { //如果 Callable 的执行已经完成            if (q != null)                q.thread = null;            return s;        }else if (s == COMPLETING) //Callable 的执行刚刚完成,后续工作还没做            Thread.yield();        else if (Thread.interrupted()) {            //线程被中断了,会抛出错误            removeWaiter(q);            throw new InterruptedException();        } else if (q == null) { //进入此处的判断证明 Callable 还未完成,所以会创建等待节点            //此处的 timed 传入为 false,不会在此返回            if (timed && nanos <= 0L)                return s;            q = new WaitNode(); //新建节点        }else if (!queued)            //queued 初始为 false,进入此处的时候会将上一个判断条件中新建的 q 加入到链表的首节点中            //并且 queued 变成 true            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);        else if (timed) {            //如果此操作是限时的,那么这里需要判断时间            final long parkNanos;            if (startTime == 0L) {                startTime = System.nanoTime();                if (startTime == 0L)                    startTime = 1L;                parkNanos = nanos;            } else {                long elapsed = System.nanoTime() - startTime;                if (elapsed >= nanos) {                    removeWaiter(q);                    return state;                }                parkNanos = nanos - elapsed;            }            if (state < COMPLETING)                //此处挂起线程,时间为 parkNanos                //本例中传入为 0L,所以是永久挂起                LockSupport.parkNanos(this, parkNanos);        }else            //永久挂起线程            LockSupport.park(this);    }}

四 一点唠叨

FutureTask 和 ThreadLocal 一样,都是 java.util.current 包中的小工具,封装不复杂,理解即可。


本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

转载地址:http://slgxo.baihongyu.com/

你可能感兴趣的文章
键盘过滤驱动
查看>>
SSL工作原理
查看>>
iOS中block实现的探究
查看>>
Hadoop JobHistory
查看>>
GridView编辑删除操作
查看>>
KMP算法的实现(Java语言描述)
查看>>
session销毁
查看>>
菜鸟学Java(二十二)——重新认识泛型
查看>>
wc命令
查看>>
noip模拟题题解集
查看>>
linux进程通信之共享内存
查看>>
live555
查看>>
mysql基础之存储引擎
查看>>
单例模式
查看>>
什么是模拟中继线?
查看>>
uCGUI动态内存管理
查看>>
主动发电
查看>>
【Android】 PopupWindow使用小结
查看>>
delphi webbrowser 经常用法演示样例
查看>>
sql异常
查看>>