亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

高性能SPSC無鎖隊(duì)列設(shè)計(jì)之路

evin2016 / 3021人閱讀

摘要:當(dāng)多線程修改互相獨(dú)立的變量時(shí),如果這些變量共享同一個(gè)緩存行,就會(huì)無意中影響彼此的性能,這就是偽共享。

本文整理了Single Producer/Consumer lock free Queue step by step這篇文章里頭關(guān)于高性能的SPSC無鎖隊(duì)列使用遵循的幾個(gè)原則:

單寫原則

使用lazySet替代volatile set

使用位運(yùn)算替代取模運(yùn)算

避免偽共享

減少緩存一致性沖突

1.Single Writer Principle(單寫原則)

如果只有一個(gè)線程對(duì)資源進(jìn)行寫操作,它實(shí)際上是比你想象的更容易,這個(gè)方案是可行的,無需CPU浪費(fèi)管理資源爭奪或上下文切換。當(dāng)然,如果有多個(gè)線程讀取相同的數(shù)據(jù)。CPU可以通過高速緩存一致性的子系統(tǒng)廣播只讀數(shù)據(jù)的拷貝到其他核。這雖然有成本的,但它的尺度非常好。
多個(gè)線程如果同時(shí)寫同一個(gè)資源,必有爭奪,就需要用鎖或樂觀鎖等堵塞方法,而非堵塞的單線程寫比多線程寫要快,能獲得高吞吐量和低延遲,特別是多核情況,一個(gè)線程一個(gè)CPU核,大大增加其他CPU核并行運(yùn)行其他線程的概率。

Method Time (ms)
One Thread 300
One Thread with Memory Barrier 4,700
One Thread with CAS 5,700
Two Threads with CAS 18,000
One Thread with Lock 10,000
Two Threads with Lock 118,000

Disruptor分離了關(guān)注,真正實(shí)現(xiàn)單寫原則。(Disruptor的特點(diǎn)是將多線程生產(chǎn)者通過Ringbuffer變成單線程消費(fèi)者,通過單線程消費(fèi)者對(duì)共享資源進(jìn)行寫操作)
目前 Node.js, Erlang, Actor 模式, SEDA 都采取了單寫解決方案,但是他們大多數(shù)使用基于隊(duì)列的下實(shí)現(xiàn)的,它打破多帶帶寫原則

2.使用lazySet替代volatile set

lazySet是使用Unsafe.putOrderedObject方法,會(huì)前置一個(gè)store-store barrier(在當(dāng)前的硬件體系下或者是no-op或者非常輕),而不是store-load barrier。

store-load barrier較慢,總是用在volatile的寫操作上。在操作序列Store1; StoreStore;Store2中,Store1的數(shù)據(jù)會(huì)在Store2和后續(xù)寫操作之前對(duì)其它處理器可見。換句話說,就是保證了對(duì)其它數(shù)據(jù)可見的寫的順序。

如果只有一個(gè)線程寫我們就用不著store-load barrier,lazySet和volatile set在單寫原則下面是等價(jià)的。

這種性能提升是有代價(jià)的,雖然廉價(jià),也就是寫后結(jié)果并不會(huì)被其他線程看到,甚至是自己的線程,通常是幾納秒后被其他線程看到,lazySet的寫在實(shí)踐上來延遲是納秒級(jí),這個(gè)時(shí)間比較短,所以代價(jià)可以忍受。

類似Unsafe.putOrderedObject還有unsafe.putOrderedLong等方法,unsafe.putOrderedLong比使用 volatile long要快3倍左右。

3.使用位運(yùn)算替代取模運(yùn)算

比如這段

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail;
        final long wrapPoint = currentTail - buffer.length;
        if (head <= wrapPoint) {
            return false;
        }

        buffer[(int) (currentTail % buffer.length)] = e;
        tail = currentTail + 1;

        return true;
    }

使用位運(yùn)算之后

mask = capacity - 1;
public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }
性能對(duì)比

x % 8 == x & (8 - 1) 但是位運(yùn)算速度更快

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class ModuloMaskTest {

    private static final int LENGTH = 16;
    int[] ints = new int[LENGTH];
    int mask = LENGTH - 1;
    int someIndex = 5;

    @Benchmark
    public int moduloLengthNoMask() {
        return someIndex % ints.length;
    }

    @Benchmark
    public int moduloLengthMask() {
        return someIndex & (ints.length - 1);
    }

    @Benchmark
    public int moduloConstantLengthNoMask() {
        return someIndex % LENGTH;
    }

    @Benchmark
    public int moduloMask() {
        return someIndex & mask;
    }

    @Benchmark
    public int consume() {
        return someIndex;
    }
    @Benchmark
    public void noop() {
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(".*" +ModuloMaskTest.class.getSimpleName()+ ".*")
                .forks(1)
                .build();
        new Runner(opt).run();
    }
}

結(jié)果如下:

# Run complete. Total time: 00:07:34

Benchmark                                  Mode  Cnt  Score   Error  Units
ModuloMaskTest.consume                     avgt   20  3.099 ± 0.152  ns/op
ModuloMaskTest.moduloConstantLengthNoMask  avgt   20  3.430 ± 0.509  ns/op
ModuloMaskTest.moduloLengthMask            avgt   20  3.505 ± 0.058  ns/op
ModuloMaskTest.moduloLengthNoMask          avgt   20  6.490 ± 0.143  ns/op
ModuloMaskTest.moduloMask                  avgt   20  3.304 ± 0.159  ns/op
ModuloMaskTest.noop                        avgt   20  0.404 ± 0.010  ns/op

可以發(fā)現(xiàn)%操作性能最差要6.x納秒,&操作基本在3ns左右

4.避免偽共享 L1 L2 L3 cache

當(dāng) CPU 執(zhí)行運(yùn)算的時(shí)候,它先去 L1 查找所需的數(shù)據(jù),再去 L2,然后是L3,最后如果這些緩存中都沒有,所需的數(shù)據(jù)就要去主內(nèi)存拿。走得越遠(yuǎn),運(yùn)算耗費(fèi)的時(shí)間就越長。所以如果你在做一些很頻繁的事,你要確保數(shù)據(jù)在 L1 緩存中。

從CPU到 大約需要的CPU周期 大約需要的時(shí)間
主存 約60-80ns
QPI 總線傳輸(between sockets, not drawn) 約20ns
L3 cache 約40-45 cycles 約15ns
L2 cache 約10 cycles 約3ns
L1 cache 約3-4 cycles 約1ns
寄存器 1 cycle

可見CPU讀取主存中的數(shù)據(jù)會(huì)比從L1中讀取慢了近2個(gè)數(shù)量級(jí)。

定義

Cache是由很多個(gè)cache line組成的。每個(gè)cache line通常是64字節(jié),并且它有效地引用主內(nèi)存中的一塊兒地址。一個(gè)Java的long類型變量是8字節(jié),因此在一個(gè)緩存行中可以存8個(gè)long類型的變量。
CPU每次從主存中拉取數(shù)據(jù)時(shí),會(huì)把相鄰的數(shù)據(jù)也存入同一個(gè)cache line。
在訪問一個(gè)long數(shù)組的時(shí)候,如果數(shù)組中的一個(gè)值被加載到緩存中,它會(huì)自動(dòng)加載另外7個(gè)。因此你能非??斓谋闅v這個(gè)數(shù)組。事實(shí)上,你可以非??焖俚谋闅v在連續(xù)內(nèi)存塊中分配的任意數(shù)據(jù)結(jié)構(gòu)。這種無法充分使用緩存行特性的現(xiàn)象,稱為偽共享。

當(dāng)多線程修改互相獨(dú)立的變量時(shí),如果這些變量共享同一個(gè)緩存行,就會(huì)無意中影響彼此的性能,這就是偽共享。緩存行上的寫競爭是運(yùn)行在SMP系統(tǒng)中并行線程實(shí)現(xiàn)可伸縮性最重要的限制因素。有人將偽共享描述成無聲的性能殺手。

圖1說明了偽共享的問題。在核心1上運(yùn)行的線程想更新變量X,同時(shí)核心2上的線程想要更新變量Y。不幸的是,這兩個(gè)變量在同一個(gè)緩存行中。每個(gè)線程都要去競爭緩存行的所有權(quán)來更新變量。如果核心1獲得了所有權(quán),緩存子系統(tǒng)將會(huì)使核心2中對(duì)應(yīng)的緩存行失效。當(dāng)核心2獲得了所有權(quán)然后執(zhí)行更新操作,核心1就要使自己對(duì)應(yīng)的緩存行失效。這會(huì)來來回回的經(jīng)過L3緩存,大大影響了性能。如果互相競爭的核心位于不同的插槽,就要額外橫跨插槽連接,問題可能更加嚴(yán)重。

解決

對(duì)于偽共享,一般的解決方案是,增大數(shù)組元素的間隔使得由不同線程存取的元素位于不同的緩存行上,以空間換時(shí)間。在jdk1.8中,有專門的注解@Contended來避免偽共享,更優(yōu)雅地解決問題。

@Contended
public class VolatileLong {
    public volatile long value = 0L;
}

public class FalseSharingJdk8 implements Runnable {
    public static int NUM_THREADS = 4; // change
    public final static long ITERATIONS = 500L * 1000L * 1000L;
    private final int arrayIndex;
    private static VolatileLong[] longs;

    public FalseSharingJdk8(final int arrayIndex) {
        this.arrayIndex = arrayIndex;
    }

    /**
     * -XX:-RestrictContended
     * –XX:+PrintFieldLayout  --- 只是在調(diào)試版jdk有效
     * @param args
     * @throws Exception
     */
    public static void main(final String[] args) throws Exception {
        Thread.sleep(10000);
        System.out.println("starting....");
        if (args.length == 1) {
            NUM_THREADS = Integer.parseInt(args[0]);
        }

        longs = new VolatileLong[NUM_THREADS];
        for (int i = 0; i < longs.length; i++) {
            longs[i] = new VolatileLong();
        }
        final long start = System.nanoTime();
        runTest();
        System.out.println("duration = " + (System.nanoTime() - start));
    }

    private static void runTest() throws InterruptedException {
        Thread[] threads = new Thread[NUM_THREADS];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(new FalseSharingJdk8(i));
        }
        for (Thread t : threads) {
            t.start();
        }
        for (Thread t : threads) {
            t.join();
        }
    }

    public void run() {
        long i = ITERATIONS + 1;
        while (0 != --i) {
            longs[arrayIndex].value = i;
        }
    }
}

沒有使用注解的話,需要自己去填充

public final static class ValuePadding {
        protected long p1, p2, p3, p4, p5, p6, p7;
        protected volatile long value = 0L;
        protected long p9, p10, p11, p12, p13, p14;
        protected long p15;
    }
5.減少緩存一致性沖突

只要系統(tǒng)只有一個(gè)CPU核在工作,一切都沒問題。如果有多個(gè)核,每個(gè)核又都有自己的緩存,那么我們就遇到問題了:如果某個(gè)CPU緩存段中對(duì)應(yīng)的內(nèi)存內(nèi)容被另外一個(gè)CPU偷偷改了,會(huì)發(fā)生什么?
緩存一致性協(xié)議就是為了解決這個(gè)問題而設(shè)計(jì)的,使多組緩存的內(nèi)容保持一致,即使用多組緩存,但使它們的行為看起來就像只有一組緩存那樣。

    private final AtomicLong tail = new AtomicLong(0);
    private final AtomicLong head = new AtomicLong(0);

    public static class PaddedLong {
        public long value = 0, p1, p2, p3, p4, p5, p6;
    }

    private final PaddedLong tailCache = new PaddedLong();
    private final PaddedLong headCache = new PaddedLong();

    public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - capacity;
        if (headCache.value <= wrapPoint) {
            headCache.value = head.get();
            if (headCache.value <= wrapPoint) {
                return false;
            }
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tailCache.value) {
            tailCache.value = tail.get();
            if (currentHead >= tailCache.value) {
                return null;
            }
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

對(duì)比沒有cache的版本

private final AtomicLong tail = new AtomicLong(0);
private final AtomicLong head = new AtomicLong(0);

public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException("Null is not a valid element");
        }

        final long currentTail = tail.get();
        final long wrapPoint = currentTail - buffer.length;
        if (head.get() <= wrapPoint) {
            return false;
        }

        buffer[(int) currentTail & mask] = e;
        tail.lazySet(currentTail + 1);

        return true;
    }

    public E poll() {
        final long currentHead = head.get();
        if (currentHead >= tail.get()) {
            return null;
        }

        final int index = (int) currentHead & mask;
        final E e = buffer[index];
        buffer[index] = null;
        head.lazySet(currentHead + 1);

        return e;
    }

對(duì)比數(shù)據(jù)

0 - ops/sec=56,689,539 - OneToOneConcurrentArrayQueue2 result=777
1 - ops/sec=33,578,974 - OneToOneConcurrentArrayQueue2 result=777
2 - ops/sec=54,105,692 - OneToOneConcurrentArrayQueue2 result=777
3 - ops/sec=84,290,815 - OneToOneConcurrentArrayQueue2 result=777
4 - ops/sec=79,851,727 - OneToOneConcurrentArrayQueue2 result=777
-----
0 - ops/sec=110,506,679 - OneToOneConcurrentArrayQueue3 result=777
1 - ops/sec=117,252,276 - OneToOneConcurrentArrayQueue3 result=777
2 - ops/sec=115,639,936 - OneToOneConcurrentArrayQueue3 result=777
3 - ops/sec=116,555,884 - OneToOneConcurrentArrayQueue3 result=777
4 - ops/sec=115,712,336 - OneToOneConcurrentArrayQueue3 result=777

整體上有一定的提升。

doc

Single Producer/Consumer lock free Queue step by step

Single Writer Principle

Atomic*.lazySet is a performance win for single writers

多帶帶寫原則Single Writer Principle

Java性能優(yōu)化要點(diǎn)之五: 隊(duì)列與lazySet

學(xué)習(xí)一下Disruptor

The Mythical Modulo Mask

偽共享和緩存行填充,從Java 6, Java 7 到Java 8

Java8的偽共享和緩存行填充--@Contended注釋

Diving Deeper into Cache Coherency

緩存一致性(Cache Coherency)入門

偽共享(False Sharing)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/70363.html

相關(guān)文章

  • JCTools簡介

    摘要:序是一款對(duì)并發(fā)數(shù)據(jù)結(jié)構(gòu)進(jìn)行增強(qiáng)的并發(fā)工具,主要提供了以及的增強(qiáng)數(shù)據(jù)結(jié)構(gòu)。只有幾個(gè)方法對(duì)比主要是操作之后沒有立即返回是對(duì)的增強(qiáng),對(duì)多的支持以及高并發(fā)更新提供更好的性能。是對(duì)的簡單包裝以支持的接口。 序 JCTools是一款對(duì)jdk并發(fā)數(shù)據(jù)結(jié)構(gòu)進(jìn)行增強(qiáng)的并發(fā)工具,主要提供了map以及queue的增強(qiáng)數(shù)據(jù)結(jié)構(gòu)。原來netty還是自己寫的MpscLinkedQueueNode,后來新版本就換成...

    YJNldm 評(píng)論0 收藏0
  • 十.Go并發(fā)編程--channel使用

    摘要:比如主協(xié)程啟動(dòng)個(gè)子協(xié)程,主協(xié)程等待所有子協(xié)程退出后再繼續(xù)后續(xù)流程,這種場景下也可輕易實(shí)現(xiàn)。這個(gè)例子中,父協(xié)程僅僅是等待子協(xié)程結(jié)束,其實(shí)父協(xié)程也可以向管道中寫入數(shù)據(jù)通知子協(xié)程結(jié)束,這時(shí)子協(xié)程需要定期地探測管道中是否有消息出現(xiàn)。一.設(shè)計(jì)原理Go 語言中最常見的、也是經(jīng)常被人提及的設(shè)計(jì)模式就是:不要通過共享內(nèi)存來通信,我們應(yīng)該使用通信來共享內(nèi)存通過共享內(nèi)存來通信是直接讀取內(nèi)存的數(shù)據(jù),而通過通信來共...

    supernavy 評(píng)論0 收藏0
  • 【推薦】最新200篇:技術(shù)文章整理

    摘要:作為面試官,我是如何甄別應(yīng)聘者的包裝程度語言和等其他語言的對(duì)比分析和主從復(fù)制的原理詳解和持久化的原理是什么面試中經(jīng)常被問到的持久化與恢復(fù)實(shí)現(xiàn)故障恢復(fù)自動(dòng)化詳解哨兵技術(shù)查漏補(bǔ)缺最易錯(cuò)過的技術(shù)要點(diǎn)大掃盲意外宕機(jī)不難解決,但你真的懂?dāng)?shù)據(jù)恢復(fù)嗎每秒 作為面試官,我是如何甄別應(yīng)聘者的包裝程度Go語言和Java、python等其他語言的對(duì)比分析 Redis和MySQL Redis:主從復(fù)制的原理詳...

    BicycleWarrior 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

evin2016

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<