亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

使用akka作異步任務(wù)處理

shiweifu / 1776人閱讀

摘要:創(chuàng)建訂單時同步操作有查詢庫存,扣款,刷新庫存可異步的操作有通知風(fēng)控系統(tǒng),給買家發(fā)送扣款郵件和短信,通知賣家,創(chuàng)建一些定時任務(wù)。

同步轉(zhuǎn)異步是一種常見的優(yōu)化手段,最近一次在做調(diào)優(yōu)時便大量使用了這種方式。通常在一個業(yè)務(wù)場景中會包含多個操作,有些操作的結(jié)果需要讓用戶立馬知道,但有些操作則不需要。這些用戶不需要等待結(jié)果的操作,我們在編程的時候便可以異步處理。這么做最直接的效果就是縮短接口響應(yīng)速度,提升用戶體驗。

我此次優(yōu)化的是下單場景。創(chuàng)建訂單時同步操作有: 查詢庫存,扣款,刷新庫存; 可異步的操作有: 通知風(fēng)控系統(tǒng),給買家發(fā)送扣款郵件和短信,通知賣家,創(chuàng)建一些定時任務(wù)。

最初我用的方案是Spring提供的@Async機制。這是一種很輕量的做法,只需要在可異步調(diào)用的方法上加上@Async注解即可。但是這種做法也存在兩個問題: 1. 不支持類內(nèi)部方法之間的調(diào)用。使用這種方式,我必須要把一些需要異步調(diào)用的方法轉(zhuǎn)移到一個新類里,這點讓人不爽。2. 當系統(tǒng)crash的時候,緩存的任務(wù)就丟了。因此,這個方案并不特別理想。

兩年之前用akka做過一個社交應(yīng)用的后端服務(wù),而且消息模型天生異步,所以自然想到了用akka。但是用akka的話也有一些地方需要注意。第一,Actor是單線程順序執(zhí)行,如果任務(wù)比較多最好使用actor router。actor router管理多個actor,可以做到一定限度的并行執(zhí)行。第二,使用有持久化actor,確保任務(wù)不會丟失。我會以發(fā)push提醒為例描述一下這個方案的實現(xiàn)細節(jié)。多數(shù)場景中發(fā)push提醒都可進行異步調(diào)用。

下單邏輯都放在OrderService中,下單成功給賣家發(fā)送push提醒時,Orderservice會給NotificationActor發(fā)送一個消息。

NotificationActor有兩個職責(zé):1. 保存接收到的任務(wù);2. 把消息轉(zhuǎn)發(fā)給NotificationWorker,當Worker執(zhí)行成功之后把消息刪除。在最新版本的akka中可以使用At-Least-Once Delivery實現(xiàn)這兩個功能。

NotificationWorkerRouter僅僅處理發(fā)送邏輯。WorkerActor以Router方式進行部署,以實現(xiàn)并行處理,提高處理效率。

下邊看一下具體實現(xiàn)細節(jié):

public class NotificationActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private ActorRef notificationWorkers = null;
    private final String uniqueId = UUID.randomUUID().toString();

    @Autowired
    public NotificationActor(final ActorSystemManager actorSystemManager) {
        this.notificationWorkers = actorSystemManager.notificationWorkers;
    }

    @Override public String persistenceId() {
        return "journal:notification-actor:" + uniqueId;
    }

    @Override public void onReceiveRecover(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            deliverAckMessage((NotificationMessage) msg);
        }
    }

    @Override public void onReceiveCommand(final Object msg) throws Throwable {
        if (msg instanceof NotificationMessage) {
            persist(msg, m -> { deliverAckMessage((NotificationMessage) m); });
        } else if (msg instanceof Confirm) {
            Confirm confirm = (Confirm) msg;
            confirmMessage(new MsgConfirmed(confirm.deliveryId));
        } else if (msg instanceof UnconfirmedWarning) {
            UnconfirmedWarning warning = (UnconfirmedWarning) msg;
            warning.getUnconfirmedDeliveries().forEach(d -> {
                log.error("[NOTIFICATION-ACTOR] Unconfirmed Messages: {}", d.message());

                confirmMessage(new MsgConfirmed(d.deliveryId()));
            });
        } else {
            unhandled(msg);
        }
    }

    private void deliverAckMessage(NotificationMessage event) {
        deliver(notificationWorkers.path(), (Function) deliveryId -> new AckMessage(deliveryId, event));
    }

    private void confirmMessage(final MsgConfirmed evt) {
        confirmDelivery(evt.deliveryId);
        deleteMessages(evt.deliveryId);
    }

    public interface NotificationMessage extends Event {}

    public static final @Data class PushMessage implements NotificationMessage {
        private final Long source;
        private final Long target;
        private final String trigger;
        private final ImmutableMap payload;
    }
}

public class NotificationWorkerActor extends UntypedActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private final @NonNull NotificationService notificationService;

    @Autowired
    public NotificationWorkerActor(final NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override public void onReceive(final Object event) throws Throwable {
        if (event instanceof AckMessage) {
            final AckMessage ackMessage = (AckMessage) event;
            NotificationMessage msg = (NotificationMessage) ackMessage.msg;
            log.info("[NOTIFICATION] receive message: {}", msg);

            if (msg instanceof PushMessage) {
                final PushMessage m = (PushMessage) msg;
                log.info("[NOTIFICATION] send push notification from: {} to: {}", m.getSource(), m.getTarget());
                notificationService.notify(m.getSource(), m.getTarget(), m.getTrigger(), m.getPayload());
            }
            sender().tell(new Confirm(ackMessage.deliveryId), self());
        } else {
            unhandled(event);
        }
    }
}

public class OrderService {
    public void createOrder() {
        actorSystemManager.notificationActor.tell(
          new PushMessage(), ActorRef.noSender()
        );
    }
}

最早實施這個方案的時候遇到一個問題,說一下這個問題如何產(chǎn)生的。我們一共有三臺服務(wù)器,三臺服務(wù)器都會部署同樣的代碼,以NotificationActor為例,它會分別部署在三個機器上。actor journal我們使用mysql存儲。akka persistent actor內(nèi)部有一個sequence number用來對接收到的消息進行計數(shù),這個數(shù)字是遞增的。同時這個數(shù)字也會在journal中記錄。最初我的persistenceId方法是這樣實現(xiàn)的:

@Override public String persistenceId() {
    return "journal:notification-actor";
}

那么,假如server1上的NotificationActor接收了一個消息,那么它的sequence number會變成1,mysql中將會存儲的sequence number為1的消息。這時server2上也接收到了一個消息,因為它的最初sequence number是0,所以它也會把現(xiàn)在接收到的消息的sequence number設(shè)置為1。但是顯然這條消息是不能持久化的,因為它和數(shù)據(jù)庫記錄的sequence number沖突了。根本原因是三臺服務(wù)器上的NotificationActor的persistenceId是一樣的。

上邊代碼中給出了一種方案,把persistenceId變成random的,每次actor啟動的時候都會得到不同的persistenceId,這樣就解決了上述問題。還有一種方案是引入akka cluster,使用akka singleton。這種方案會在下一篇文章中詳細說明。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://www.ezyhdfw.cn/yun/66470.html

相關(guān)文章

  • 使用Akka Cluster Singleton實現(xiàn)集群單例

    摘要:接下來會選擇一個最老的實例并在上面創(chuàng)建單例??梢源_保整個集群中至多有一個的實例,言下之意,存在沒有實例的時刻。訪問需要借助于,會把所有的消息給當前被代理的實例。 上篇文章主要講了如何使用Akka作異步任務(wù)處理。最后還拋出一個問題。 具體問題的描述就不在這篇文章贅述了,我們僅簡單回顧一下第一種解決方案:覆寫persistenceId()時,加一個UUID,這樣三臺服務(wù)器上的Actor就不...

    xiangzhihong 評論0 收藏0
  • 使用Akka Cluster Singleton實現(xiàn)集群單例

    摘要:接下來會選擇一個最老的實例并在上面創(chuàng)建單例。可以確保整個集群中至多有一個的實例,言下之意,存在沒有實例的時刻。訪問需要借助于,會把所有的消息給當前被代理的實例。 上篇文章主要講了如何使用Akka作異步任務(wù)處理。最后還拋出一個問題。 具體問題的描述就不在這篇文章贅述了,我們僅簡單回顧一下第一種解決方案:覆寫persistenceId()時,加一個UUID,這樣三臺服務(wù)器上的Actor就不...

    TZLLOG 評論0 收藏0
  • JVM并發(fā)編程模型覽

    摘要:本文介紹和點評上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線程阻塞,導(dǎo)致等待?;灸P瓦@是最簡單的模型,創(chuàng)建線程來執(zhí)行一個任務(wù),完畢后銷毀線程。響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領(lǐng)域的的編程模型。 本文介紹和點評JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗...

    cppowboy 評論0 收藏0
  • JVM并發(fā)編程模型覽

    摘要:本文介紹和點評上的等并發(fā)編程模型。異步更適合并發(fā)編程。同步使線程阻塞,導(dǎo)致等待。基本模型這是最簡單的模型,創(chuàng)建線程來執(zhí)行一個任務(wù),完畢后銷毀線程。響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程模式。起源于電信領(lǐng)域的的編程模型。 本文介紹和點評JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并發(fā)編程模型。本人經(jīng)驗...

    wudengzan 評論0 收藏0
  • 關(guān)于分布式計算的一些概念

    摘要:關(guān)于三者的一些概括總結(jié)離線分析框架,適合離線的復(fù)雜的大數(shù)據(jù)處理內(nèi)存計算框架,適合在線離線快速的大數(shù)據(jù)處理流式計算框架,適合在線的實時的大數(shù)據(jù)處理我是一個以架構(gòu)師為年之內(nèi)目標的小小白。 整理自《架構(gòu)解密從分布式到微服務(wù)》第七章——聊聊分布式計算.做了相應(yīng)補充和修改。 [TOC] 前言 不管是網(wǎng)絡(luò)、內(nèi)存、還是存儲的分布式,它們最終目的都是為了實現(xiàn)計算的分布式:數(shù)據(jù)在各個計算機節(jié)點上流動,同...

    Ververica 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<