亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

futureTask的超時(shí)原理解析

liangzai_cool / 1403人閱讀

摘要:序本文主要解析一下的超時(shí)原理。這里等待超時(shí),然后返回原始狀態(tài)小結(jié)由此可見,超時(shí)的機(jī)制其實(shí)不能中斷里頭實(shí)際執(zhí)行的動(dòng)作,超時(shí)只是讓調(diào)用線程能夠在指定時(shí)間返回而已,而底層調(diào)用的方法,實(shí)際還在執(zhí)行。這里是需要額外注意的。

本文主要解析一下futureTask的超時(shí)原理。

實(shí)例
ExecutorService executor = Executors.newFixedThreadPool(1);
Future future = executor.submit(
                    new Callable() {
                        public Void call() throws Exception {
                            //do something
                            return null;
                        }
                    });
future.get(500, TimeUnit.MILLISECONDS);

里頭構(gòu)造的是java/util/concurrent/ThreadPoolExecutor.java

submit

java/util/concurrent/AbstractExecutorService.java

public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
}
protected  RunnableFuture newTaskFor(Callable callable) {
        return new FutureTask(callable);
}
execute

java/util/concurrent/ThreadPoolExecutor.java

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn"t, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

這里只是放入workQueue,然后判斷是否需要添加線程

runWorker

java/util/concurrent/ThreadPoolExecutor.java

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這里循環(huán)從workQueue取出task,然后調(diào)用task.run()

futureTask.run

java/util/concurrent/FutureTask.java

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

這里如果執(zhí)行完成的話,會(huì)調(diào)用set(result),而異常的話,會(huì)調(diào)用setException(ex)

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
}
protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
}

都把狀態(tài)從NEW設(shè)置為COMPLETING

future.get(long)
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

這里看awaitDone,等待指定的時(shí)候,發(fā)現(xiàn)狀態(tài)不是COMPLETING,則拋出TimeoutException,讓調(diào)用線程返回。

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

這里等待超時(shí),然后返回原始狀態(tài)

小結(jié)

由此可見,超時(shí)的機(jī)制其實(shí)不能中斷callable里頭實(shí)際執(zhí)行的動(dòng)作,超時(shí)只是讓調(diào)用線程能夠在指定時(shí)間返回而已,而底層調(diào)用的方法,實(shí)際還在執(zhí)行。這里是需要額外注意的。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/67850.html

相關(guān)文章

  • FutureTask源碼分析筆記

    摘要:主要的實(shí)現(xiàn)實(shí)際上運(yùn)行還是一個(gè),它對(duì)做了一個(gè)封裝,讓開發(fā)人員可以從其中獲取返回值是有狀態(tài)的共種狀態(tài),四種狀態(tài)變換的可能和的區(qū)別通過方法調(diào)用有返回值可以拋異常結(jié)果的實(shí)現(xiàn)原理判斷狀態(tài)非狀態(tài)則直接進(jìn)入返回結(jié)果處于狀態(tài),則進(jìn)入等待流程獲 主要的實(shí)現(xiàn)FutureTask # FutureTask實(shí)際上運(yùn)行還是一個(gè)runnable,它對(duì)callable做了一個(gè)封裝,讓開發(fā)人員可以從其中獲取返回值; ...

    PascalXie 評(píng)論0 收藏0
  • 追蹤解析 FutureTask 源碼

    摘要:零前期準(zhǔn)備文章異常啰嗦且繞彎。版本版本簡介是中默認(rèn)的實(shí)現(xiàn)類,常與結(jié)合進(jìn)行多線程并發(fā)操作。所以方法的主體其實(shí)就是去喚醒被阻塞的線程。本文僅為個(gè)人的學(xué)習(xí)筆記,可能存在錯(cuò)誤或者表述不清的地方,有緣補(bǔ)充 零 前期準(zhǔn)備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡介 ...

    xcc3641 評(píng)論0 收藏0
  • FutureTask源碼分析

    摘要:從而可以啟動(dòng)和取消異步計(jì)算任務(wù)查詢異步計(jì)算任務(wù)是否完成和獲取異步計(jì)算任務(wù)的返回結(jié)果。原理分析在分析中我們沒有看它的父類,其中有一個(gè)方法,返回一個(gè),說明該方法可以獲取異步任務(wù)的返回結(jié)果。 FutureTask介紹 FutureTask是一種可取消的異步計(jì)算任務(wù)。它實(shí)現(xiàn)了Future接口,代表了異步任務(wù)的返回結(jié)果。從而FutureTask可以啟動(dòng)和取消異步計(jì)算任務(wù)、查詢異步計(jì)算任務(wù)是否完成...

    luqiuwen 評(píng)論0 收藏0
  • FutureTask

    摘要:可取消的異步計(jì)算。只有在計(jì)算完成后才能檢索結(jié)果如果計(jì)算還沒有完成,方法將會(huì)被阻塞。任務(wù)正常執(zhí)行結(jié)束。任務(wù)執(zhí)行過程中發(fā)生異常。任務(wù)即將被中斷。運(yùn)行完成后將會(huì)清空。根據(jù)執(zhí)行結(jié)果設(shè)置狀態(tài)。 FutureTask What is it ? 可取消的異步計(jì)算。該類提供了 Future的基本實(shí)現(xiàn),其中包括啟動(dòng)和取消計(jì)算的方法,查詢計(jì)算是否完成以及檢索計(jì)算結(jié)果的方法。只有在計(jì)算完成后才能檢索...

    GeekGhc 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<