Java并发之FutureTask源码浅析

属性

状态属性

Possible state transitions:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED
    // 当前task状态
    private volatile int state;
    // 任务尚未执行
    private static final int NEW          = 0;
    // 任务正在结束,但未完成
    private static final int COMPLETING   = 1;
    // 任务正常结束
    private static final int NORMAL       = 2;
    // 任务执行中发生异常
    private static final int EXCEPTIONAL  = 3;
    // 当前任务被取消
    private static final int CANCELLED    = 4;
    // 当前任务中断中
    private static final int INTERRUPTING = 5;
    // 当前任务已中断
    private static final int INTERRUPTED  = 6;

其他属性

// runnable使用适配器模式伪装成callable
private Callable<V> callable;
// 执行结果或抛出的异常
private Object outcome;
// 保存执行任务的线程对象引用
private volatile Thread runner;
// 可能有多个线程get任务结果,所以使用了一种栈结构
private volatile WaitNode waiters;

构造方法

注意到线程池调用的submit重载方法有callablerunnable两种,解释了FutureTask为什么要有两种不同的构造方法

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

两种构造方法一种是callable,另一种是runnable

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    // callable是程序员自己实现的业务类
    this.callable = callable;
    // 设置当前任务状态为NEW
    this.state = NEW;
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;
}

跟入Executors.callable方法

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    // 适配器模式将runnable转为callable
    return new RunnableAdapter<T>(task, result);
}

// 继续跟入
// 实现了Callable接口
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        // 注意到并没有返回真正的执行结果
        return result;
    }
}

run

submut方法通过newTaskFor方法创建FutureTask对象,通过execute方法将FutureTask对象真正提交到线程池中(参考上文代码)

线程池的执行入口是FutureTask对象的run

public void run() {
    // 条件一:task已被执行
    // 条件二:CAS失败(当前任务已被其他线程抢占)
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
        return;
    // NEW状态且抢占成功
    try {
        Callable<V> c = callable;
        // 防止空指针和外部线程cancel了当前任务
        if (c != null && state == NEW) {
            // 执行结果的引用
            V result;
            // true:callable.run代码执行成功无异常
            // false:callable.run执行失败有异常
            boolean ran;
            try {
                // 调用程序员实现的callable
                // 调用被装饰后的runnable
                result = c.call();
                // call无异常
                ran = true;
            } catch (Throwable ex) {
                // 执行异常
                result = null;
                ran = false;
                // 设置异常
                setException(ex);
            }
            // 设置执行结果
            if (ran)
                set(result);
        }
    } finally {
        // 执行任务的线程清空
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            // 后续分析
            handlePossibleCancellationInterrupt(s);
    }
}

setsetException方法类似

protected void set(V v) {
    // 使用CAS设置任务完成中
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 执行结果赋值
        outcome = v;
        // 设置任务状态为正常结束(直接设置内存)
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    // 使用CAS设置任务完成中
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 这里的t是抛出的异常
        outcome = t;
        // 设置任务状态为异常(直接设置内存)
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

get

// 多个线程等待当前任务执行结果
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 状态还未结束
    if (s <= COMPLETING)
        // 实现阻塞(核心)
        s = awaitDone(false, 0L);
    return report(s);
}

核心方法awaitDone

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // 超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    // 当前引用线程封装为wait node
    WaitNode q = null;
    // 表示当前线程没有入队
    boolean queued = false;
    for (;;) {
        // 当前线程是否被其他线程使用中断方式唤醒
        // 只有第一次会返回true
        if (Thread.interrupted()) {
            // 当前线程从wait node出队(下文分析)
            removeWaiter(q);
            // 终端异常
            throw new InterruptedException();
        }
        int s = state;
        // 当前线程被unpark方式唤醒
        // 如果最新状态是已经得到结果
        if (s > COMPLETING) {
            // 判断是否为当前线程创建过wait node
            if (q != null)
                // GC
                q.thread = null;
            return s;
        }
        // 正在完成中
        else if (s == COMPLETING)
            // 释放CPU进行下一次抢占
            Thread.yield();
        // 第一次循环:创建wait node对象
        else if (q == null)
            q = new WaitNode();
        // 第二次循环:已创建wait node对象但还未入队
        else if (!queued)
            // waiters执行队列头
            // CAS方式入队
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
        // 第三次自旋:
        else if (timed) {
            // 处理有超时的情况
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            // 当前get操作的线程变为waiting状态
            // 除非有其他线程唤醒或中断
            LockSupport.park(this);
    }
}

跟入removeWaiter方法

private void removeWaiter(WaitNode node) {
    if (node != null) {
        // 引用线程GC
        node.thread = null;
        retry:
        for (;;) {
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                // 下一个节点是否为空
                if (q.thread != null)
                    // 前移
                    pred = q;
                // 当前节点是空,也就是不为头节点
                else if (pred != null) {
                    // 移除当前节点
                    pred.next = s;
                    if (pred.thread == null)
                        continue retry;
                }
                // CAS方式直接指向下一个节点
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                        q, s))
                    continue retry;
            }
            break;
        }
    }
}

get最后的report方法

private V report(int s) throws ExecutionException {
    // 执行结果或异常
    Object x = outcome;
    if (s == NORMAL)
        // 正常结果直接返回
        return (V)x;
    if (s >= CANCELLED)
        // 用户取消则抛出取消异常
        throw new CancellationException();
    // 抛出执行异常
    throw new ExecutionException((Throwable)x);
}

finishCompletion

在分析到runset方法中,有finishCompletion方法未分析

private void finishCompletion() {
    // 从头遍历队列
    for (WaitNode q; (q = waiters) != null;) {
        // CAS置空防其他线程操作
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                // 当前wait node封装的线程
                Thread t = q.thread;
                if (t != null) {
                    // GC
                    q.thread = null;
                    // 唤醒当前线程
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    // 当前处理完最后一个节点
                    break;
                // GC
                q.next = null;
                // 遍历队列
                q = next;
            }
            break;
        }
    }

    // done方法为空
    done();

    // GC
    callable = null;
}

cancel

用户如果主动调用,可以取消当前任务

public boolean cancel(boolean mayInterruptIfRunning) {
    // 任务正在运行中尝试用CAS方式中断或取消
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        // 取反又返回false有点绕
        return false;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                // 当前线程不为空
                if (t != null)
                    // 中断当前线程
                    t.interrupt();
            } finally {
                // 终端完成设置已中断
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒所有get阻塞的线程
        finishCompletion();
    }
    return true;
}