亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

談談Java任務的并行處理

fasss / 2719人閱讀

摘要:前言談到并行,我們可能最先想到的是線程,多個線程一起運行,來提高我們系統(tǒng)的整體處理速度為什么使用多個線程就能提高處理速度,因為現(xiàn)在計算機普遍都是多核處理器,我們需要充分利用資源如果站的更高一點來看,我們每臺機器都可以是一個處理節(jié)點,多臺機器

前言

談到并行,我們可能最先想到的是線程,多個線程一起運行,來提高我們系統(tǒng)的整體處理速度;為什么使用多個線程就能提高處理速度,因為現(xiàn)在計算機普遍都是多核處理器,我們需要充分利用cpu資源;如果站的更高一點來看,我們每臺機器都可以是一個處理節(jié)點,多臺機器并行處理;并行的處理方式可以說無處不在,本文主要來談談Java在并行處理方面的努力。

無處不在的并行

Java的垃圾回收器,我們可以看到每一代版本的更新,伴隨著GC更短的延遲,從serial到cms再到現(xiàn)在的G1,一直在摘掉Java慢的帽子;消息隊列從早期的ActiveMQ到現(xiàn)在的kafka和RocketMQ,引入的分區(qū)的概念,提高了消息的并行性;數(shù)據(jù)庫單表數(shù)據(jù)到一定量級之后,訪問速度會很慢,我們會對表進行分表處理,引入數(shù)據(jù)庫中間件;Redis你可能覺得本身處理是單線程的,但是Redis的集群方案中引入了slot(槽)的概念;更普遍的就是我們很多的業(yè)務系統(tǒng),通常會部署多臺,通過負載均衡器來進行分發(fā);好了還有其他的一些例子,此處不在一一例舉。

如何并行

我覺得并行的核心在于"拆分",把大任務變成小任務,然后利用多核CPU也好,還是多節(jié)點也好,同時并行的處理,Java歷代版本的更新,都在為我們開發(fā)者提供更方便的并行處理,從開始的Thread,到線程池,再到fork/join框架,最后到流處理,下面使用簡單的求和例子來看看各種方式是如何并行處理的;

單線程處理

首先看一下最簡單的單線程處理方式,直接使用主線程進行求和操作;

public class SingleThread {

    public static long[] numbers;

    public static void main(String[] args) {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        long sum = 0;
        for (int i = 0; i < numbers.length; i++) {
            sum += numbers[i];
        }
        System.out.println("sum  = " + sum);
    }

}

求和本身是一個計算密集型任務,但是現(xiàn)在已經(jīng)是多核時代,只用單線程,相當于只使用了其中一個cpu,其他cpu被閑置,資源的浪費;

Thread方式

我們把任務拆分成多個小任務,然后每個小任務分別啟動一個線程,如下所示:

public class ThreadTest {

    public static final int THRESHOLD = 10_000;
    public static long[] numbers;
    private static long allSum;

    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            new Thread(new Runnable() {
                public void run() {
                    sumAll(sum((key - 1) * THRESHOLD, key * THRESHOLD));
                }
            }).start();
        }
        Thread.sleep(100);
        System.out.println("allSum = " + getAllSum());
    }

    private static synchronized long sumAll(long threadSum) {
        return allSum += threadSum;
    }

    public static synchronized long getAllSum() {
        return allSum;
    }

    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

以上指定了一個拆分閥值,計算拆分多少個認為,同時啟動多少線程;這種處理就是啟動的線程數(shù)過多,而CPU數(shù)有限,更重要的是求和是一個計算密集型任務,啟動過多的線程只會帶來更多的線程上下文切換;同時線程處理完一個任務就終止了,也是對資源的浪費;另外可以看到主線程不知道何時子任務已經(jīng)處理完了,需要做額外的處理;所有Java后續(xù)引入了線程池。

線程池方式

jdk1.5引入了并發(fā)包,其中包括了ThreadPoolExecutor,相關代碼如下:

public class ExecutorServiceTest {

    public static final int THRESHOLD = 10_000;
    public static long[] numbers;

    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        CompletionService completionService = new ExecutorCompletionService(executor);
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            completionService.submit(new Callable() {

                @Override
                public Long call() throws Exception {
                    return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                }
            });
        }
        long sumValue = 0;
        for (int i = 0; i < taskSize; i++) {
            sumValue += completionService.take().get();
        }
        // 所有任務已經(jīng)完成,關閉線程池
        System.out.println("sumValue = " + sumValue);
        executor.shutdown();
    }

    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

上面已經(jīng)分析了計算密集型并不是線程越多越好,這里創(chuàng)建了JDK默認的線程數(shù):CPU數(shù)+1,這是一個經(jīng)過大量測試以后給出的一個結(jié)果;線程池顧名思義,可以重復利用現(xiàn)有的線程;同時利用CompletionService來對子任務進行匯總;合理的使用線程池已經(jīng)可以充分的并行處理任務,只是在寫法上有點繁瑣,此時JDK1.7中引入了fork/join框架;

fork/join框架

分支/合并框架的目的是以遞歸的方式將可以并行的認為拆分成更小的任務,然后將每個子任務的結(jié)果合并起來生成整體結(jié)果;相關代碼如下:

public class ForkJoinTest extends java.util.concurrent.RecursiveTask {
    
    private static final long serialVersionUID = 1L;
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;

    public ForkJoinTest(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinTest(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        // 注:join方法會阻塞,因此有必要在兩個子任務的計算都開始之后才執(zhí)行join方法
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static void main(String[] args) {
        System.out.println(forkJoinSum(10_000_000));
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask task = new ForkJoinTest(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

ForkJoinPool是ExecutorService接口的一個實現(xiàn),子認為分配給線程池中的工作線程;同時需要把任務提交到此線程池中,需要創(chuàng)建RecursiveTask的一個子類;大體邏輯就是通過fork進行拆分,然后通過join進行結(jié)果的合并,JDK為我們提供了一個框架,我們只需要在里面填充即可,更加方便;有沒有更簡單的方式,連拆分都省了,自動拆分合并,jdk在1.8中引入了流的概念;

流方式

Java8引入了stream的概念,可以讓我們更好的利用并行,使用流代碼如下:

public class StreamTest {

    public static void main(String[] args) {
        System.out.println("sum = " + parallelRangedSum(10_000_000));
    }

    public static long parallelRangedSum(long n) {
        return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);
    }
}

以上代碼是不是非常簡單,對于開發(fā)者來說完全不需要手動拆分,使用同步機制等方式,就可以讓任務并行處理,只需要對流使用parallel()方法,系統(tǒng)自動會對任務進行拆分,當然前提是沒有共享可變狀態(tài);其實并行流內(nèi)部使用的也是fork/join框架;

總結(jié)

本文使用一個求和的實例,來介紹了jdk為開發(fā)者提供并行處理的各種方式,可以看到Java一直在為提供更方便的并行處理而努力。

參考

<>

文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/74238.html

相關文章

  • 從JDK11新增HttpClient談談非阻塞模型

    摘要:是一個倡議,它提倡提供一種帶有非阻塞背壓的異步流處理的標準。是標準的實現(xiàn)之一。的實現(xiàn)細節(jié)請求響應的與請求響應的暴露為是請求的的消費者是響應的的生產(chǎn)者內(nèi)部的內(nèi)部 北京時間 9 月 26 日,Oracle 官方宣布 Java 11 正式發(fā)布 一、JDK HTTP Client介紹 JDK11中的17個新特性 showImg(https://segmentfault.com/img/remo...

    pingan8787 評論0 收藏0
  • Java8學習小記

    摘要:但有一個限制它們不能修改定義的方法的局部變量的內(nèi)容。如前所述,這種限制存在的原因在于局部變量保存在棧上,并且隱式表示它們僅限于其所在線程。 2014年,Oracle發(fā)布了Java8新版本。對于Java來說,這顯然是一個具有里程碑意義的版本。尤其是那函數(shù)式編程的功能,避開了Java那煩瑣的語法所帶來的麻煩。 這可以算是一篇Java8的學習筆記。將Java8一些常見的一些特性作了一個概要的...

    CHENGKANG 評論0 收藏0
  • PHP下異步嘗試二:初識協(xié)程

    摘要:如果僅依靠程序自動交出控制的話,那么一些惡意程序?qū)苋菀渍加萌繒r間而不與其他任務共享。多個操作可以在重疊的時間段內(nèi)進行。 PHP下的異步嘗試系列 如果你還不太了解PHP下的生成器,你可以根據(jù)下面目錄翻閱 PHP下的異步嘗試一:初識生成器 PHP下的異步嘗試二:初識協(xié)程 PHP下的異步嘗試三:協(xié)程的PHP版thunkify自動執(zhí)行器 PHP下的異步嘗試四:PHP版的Promise ...

    MudOnTire 評論0 收藏0
  • Java8實戰(zhàn)》-第四章讀書筆記(引入流Stream)

    摘要:內(nèi)部迭代與使用迭代器顯式迭代的集合不同,流的迭代操作是在背后進行的。流只能遍歷一次請注意,和迭代器類似,流只能遍歷一次。 流(Stream) 流是什么 流是Java API的新成員,它允許你以聲明性方式處理數(shù)據(jù)集合(通過查詢語句來表達,而不是臨時編寫一個實現(xiàn))。就現(xiàn)在來說,你可以把它們看成遍歷數(shù)據(jù)集的高級迭代器。此外,流還可以透明地并行處理,你無需寫任何多線程代碼了!我會在后面的筆記中...

    _ivan 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<