亚洲中字慕日产2020,大陆极品少妇内射AAAAAA,无码av大香线蕉伊人久久,久久精品国产亚洲av麻豆网站

資訊專欄INFORMATION COLUMN

RabbitMQ學(xué)習(xí)筆記

zacklee / 1068人閱讀

摘要:消息持久化控制的屬性就是消息的持久化。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),兩個(gè)消費(fèi)者都會(huì)收到消息并處理當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),只有消費(fèi)者可以接收到消息。八的消息確認(rèn)機(jī)制在中,可以通過(guò)持久化數(shù)據(jù)解決服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題。

一、內(nèi)容大綱&使用場(chǎng)景 1. 消息隊(duì)列解決了什么問(wèn)題?

異步處理

應(yīng)用解耦

流量削鋒

日志處理

......

2. rabbitMQ安裝與配置 3. Java操作rabbitMQ

simple 簡(jiǎn)單隊(duì)列
. work queues 工作隊(duì)列 公平分發(fā) 輪詢分發(fā)
. publish/subscribe 發(fā)布訂閱
. routing 路由選擇 通配符模式
. Topics 主題

手動(dòng)和自動(dòng)確認(rèn)消息

隊(duì)列的持久化和非持久化

rabbitMQ的延遲隊(duì)列

4. Spring AMQP Spring-Rabbit 5. DEMO

MQ實(shí)現(xiàn)搜索引擎DIH增量

未支付訂單30分鐘 取消

類似百度統(tǒng)計(jì) cnzz 架構(gòu) 消息隊(duì)列

二、用戶及vhost配置 2.1 添加用戶

2.2 virtual hosts管理

virtual hosts相當(dāng)于mysql的db

一般以/開頭

2.3 用戶授權(quán)

需要對(duì)用戶進(jìn)行授權(quán)

三、簡(jiǎn)單隊(duì)列 3.1 模型

P:消息生產(chǎn)者

紅色:隊(duì)列

C:消息消費(fèi)者

包含三個(gè)對(duì)象:生產(chǎn)者、隊(duì)列、消費(fèi)者

3.2 獲取mq連接
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {
    /**
     * 獲取MQ的連接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //定義一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("127.0.0.1");
        //AMQP的端口
        factory.setPort(5672);
        //vhost
        factory.setVirtualHost("/vhost_mmr");
        factory.setUsername("rabbit");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        return connection;
    }
}
3.3 生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_simple_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個(gè)通道
        Channel channel = connection.createChannel();

        //創(chuàng)建隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello world!";

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        System.out.println("---send msg :" + msg);
        channel.close();
        connection.close();
    }
}
3.4 消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receive {
    private static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("msg receive : " + msg);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
    }
}
3.5 簡(jiǎn)單隊(duì)列的不足

耦合性高,生產(chǎn)者一一對(duì)應(yīng)消費(fèi)者,如果需要多個(gè)消費(fèi)者消費(fèi)隊(duì)列中的消息,此時(shí)簡(jiǎn)單隊(duì)列就無(wú)能為力了。

隊(duì)列名變更,源碼需要同時(shí)變更

四、Work隊(duì)列 4.1 模型

一個(gè)生產(chǎn)者將消息放入隊(duì)列中,可以有多個(gè)消費(fèi)者進(jìn)行消費(fèi)

為什么會(huì)出現(xiàn)工作隊(duì)列?

Simple隊(duì)列:是一一對(duì)應(yīng)的,實(shí)際開發(fā)中,生產(chǎn)者改善消息是毫不費(fèi)力的,而消費(fèi)者一般需要跟業(yè)務(wù)相結(jié)合,消費(fèi)者接收到消息之后就需要處理,可能需要花費(fèi)時(shí)間,此時(shí)隊(duì)列就會(huì)積壓很多消息。

4.2 輪詢分發(fā)

生產(chǎn)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個(gè)通道
        Channel channel = connection.createChannel();

        //創(chuàng)建隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            String msg = "hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            System.out.println("---send msg :" + msg);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        channel.close();
        connection.close();
    }
}

消費(fèi)者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean ack = true;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費(fèi)者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv1 : " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean ack = true;
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

現(xiàn)象

消費(fèi)者1和消費(fèi)者2處理的消息是一樣多的,這種分發(fā)方式稱為輪詢分發(fā)(round-robin),不管誰(shuí)忙或者誰(shuí)閑,都不會(huì)多給或者少給。任務(wù)均分。

4.3 公平分發(fā) fair dispatch

保證一次發(fā)送給消費(fèi)者的消息不超過(guò)一條

        /**
         * 每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息給消費(fèi)者,消費(fèi)者一次只處理一個(gè)消息
         *
         * 限制發(fā)送給同一個(gè)消費(fèi)者不得超過(guò)一條消息
         */
        int preFetchCount = 1;
        channel.basicQos(preFetchCount);

使用公平分發(fā),必須關(guān)閉自動(dòng)應(yīng)答ack,改為手動(dòng)

channel.basicAck(envelope.getDeliveryTag(), false);
boolean ack = false;//自動(dòng)應(yīng)答改為false
channel.basicConsume(QUEUE_NAME, ack, consumer);

生產(chǎn)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_work_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //從連接中獲取一個(gè)通道
        Channel channel = connection.createChannel();

        //創(chuàng)建隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /**
         * 每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息給消費(fèi)者,消費(fèi)者一次只處理一個(gè)消息
         *
         * 限制發(fā)送給同一個(gè)消費(fèi)者不得超過(guò)一條消息
         */
        int preFetchCount = 1;
        channel.basicQos(preFetchCount);

        for (int i = 0; i < 50; i++) {
            String msg = "hello " + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            System.out.println("---send msg :" + msg);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        channel.close();
        connection.close();
    }
}

消費(fèi)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        //創(chuàng)建channel
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv1 : " + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動(dòng)應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
4.4 消息應(yīng)答與消息持久化 4.4.1 消息應(yīng)答
boolean ack = false;//自動(dòng)應(yīng)答改為false
channel.basicConsume(QUEUE_NAME, ack, consumer);

ack = true時(shí)為自動(dòng)確認(rèn)模式,一旦rabbitMQ將消息分發(fā)給消費(fèi)者,該消息就會(huì)在內(nèi)存中刪除;這種情況下,如果殺死正在處理消息的消費(fèi)者,會(huì)丟失正在處理的消息;

ack = false時(shí)為手動(dòng)回執(zhí)(消息應(yīng)答)模式,如果有一個(gè)消費(fèi)者掛掉,就會(huì)將會(huì)給其他消費(fèi)者,rabbitMQ支持消息應(yīng)答,消費(fèi)者發(fā)送一個(gè)消息應(yīng)答,告訴rabbitMQ這個(gè)消息已經(jīng)被處理,然后rabbitMQ就刪除內(nèi)存中的消息;

消息應(yīng)答默認(rèn)打開,即為false;

由于消息在內(nèi)存中存儲(chǔ),如果rabbitMQ掛掉,消息仍然會(huì)丟失。

4.4.2 消息持久化
boolean durable = false;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

durable控制的屬性就是消息的持久化。

已經(jīng)聲明好的隊(duì)列,如果durable已經(jīng)為false了,就無(wú)法修改為true,rabbitMQ不允許重新定義(不同參數(shù))一個(gè)已存在的隊(duì)列

五、訂閱模式 Publish/Subscribe 5.1 模型

解讀:

1、一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者;

2、每個(gè)消費(fèi)者都有自己的隊(duì)列;

3、生產(chǎn)者沒有直接把消息發(fā)送到隊(duì)列,而是發(fā)送至交換機(jī)(eXchange)

4、每個(gè)隊(duì)列都要綁定到交換機(jī)上

5、生產(chǎn)者發(fā)送的消息,經(jīng)過(guò)交換機(jī),到達(dá)隊(duì)列,就能實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)

5.2 實(shí)現(xiàn)

生產(chǎn)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String msg = "hello ps";

        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        System.out.println("Send " + msg);

        channel.close();
        connection.close();
    }
}

消息哪去了?丟失了!因?yàn)榻粨Q機(jī)沒有存儲(chǔ)能力,在rabbitMQ中,只有隊(duì)列有存儲(chǔ)能力。此時(shí)并沒有完成隊(duì)列綁定到交換機(jī),所以數(shù)據(jù)丟失了。

消費(fèi)消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String QUEUE_NAME = "test_ps_fanout_email";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動(dòng)應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

不同的隊(duì)列做不同的事情。

5.3 Exchange(交換機(jī)、轉(zhuǎn)發(fā)器)

一方面接收生產(chǎn)者的消息,另一方面向隊(duì)列推送消息

rabbitMQ提供了四種Exchange:fanout,direct,topic,header? header模式在實(shí)際使用中較少。

fanout:不處理路由鍵

direct:處理路由鍵

topic

將路由鍵和某模式進(jìn)行匹配

任何發(fā)送到Topic Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定話題的Queue上

六、路由模式 6.1 模型

聲明exchange時(shí)指定為direct模式

綁定隊(duì)列時(shí),指定路由鍵

6.2 實(shí)現(xiàn)

生產(chǎn)者

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String msg = "hello direct";

        //指定路由鍵
        String routingKey = "warning";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("send msg:" + msg);
        channel.close();
        connection.close();
    }
}

消費(fèi)者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        //綁定隊(duì)列與交換機(jī)時(shí),指定路由鍵
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");


        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動(dòng)應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費(fèi)者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    private static final String QUEUE_NAME = "test_queue_direct_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        //綁定隊(duì)列與交換機(jī)時(shí),指定路由鍵
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv2 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動(dòng)應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}
七、Topic模式 7.1 模型

# 匹配一個(gè)或多個(gè)

* 匹配一個(gè)

7.2 實(shí)現(xiàn)

生產(chǎn)者

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        //聲明exchange,指定模式為topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String msg = "商品....";

        String routingKey = "goods.delete";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());

        System.out.println("send msg:" + msg);
        channel.close();
        connection.close();
    }
}

消費(fèi)者1

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by wangbin on 2018/6/26.
 */
public class Recv1 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[1] msg recv1 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動(dòng)應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

消費(fèi)者2

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by wangbin on 2018/6/26.
 */
public class Recv2 {
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    private static final String QUEUE_NAME = "test_queue_topic_2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf8");
                System.out.println("[2] msg recv2 : " + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean ack = false;//自動(dòng)應(yīng)答改為false
        channel.basicConsume(QUEUE_NAME, ack, consumer);
    }
}

其中,消費(fèi)者1綁定路由鍵為goods.#,消費(fèi)者2綁定路由鍵為goods.add。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.add時(shí),兩個(gè)消費(fèi)者都會(huì)收到消息并處理;當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.update時(shí),只有消費(fèi)者1可以接收到消息。

八、RabbitMQ的消息確認(rèn)機(jī)制

在rabbitMQ中,可以通過(guò)持久化數(shù)據(jù)解決rabbitMQ服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題。

問(wèn)題:生產(chǎn)者將消息發(fā)送出去之后,消息到底有沒有到達(dá)rabbitMQ服務(wù)器;默認(rèn)情況是不知道消息已到達(dá)的

兩種方式:

AMQP實(shí)現(xiàn)了事務(wù)機(jī)制

confirm模式

8.1 事務(wù)機(jī)制

txSelect

用于將當(dāng)前channel設(shè)置成transaction模式

txCommit

用于提交事務(wù)

txRollback

回滾事務(wù)

生產(chǎn)者發(fā)送消息

import com.meituan.mq.simple.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    private static final String QUEUE_NAME = "test_queue_tx";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "hello tx msg!";

        try {
            channel.txSelect();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            channel.txCommit();
        } catch (IOException e) {
            channel.txRollback();
            System.out.println("發(fā)生異常,事務(wù)已回滾");
        }
    }
}

事務(wù)機(jī)制會(huì)降低rabbitMQ的吞吐量。

8.2 Confirm模式

生產(chǎn)者將信道設(shè)置成confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫入磁盤之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號(hào),此外broker也可以設(shè)置basic.ack的multiple域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理;

confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方法來(lái)處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息。

編程模式:

1、普通,發(fā)一條

2、批量,發(fā)一批

3、異步confirm模式,提供一個(gè)回調(diào)方法

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/71406.html

相關(guān)文章

  • RabbitMq 最全的性能調(diào)優(yōu)筆記

    摘要:性能調(diào)優(yōu)筆記避免雷區(qū)要避免流控機(jī)制觸發(fā)服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到,磁盤空閑空間小于,即啟動(dòng)內(nèi)存報(bào)警,磁盤報(bào)警報(bào)警后服務(wù)端觸發(fā)流控機(jī)制。最佳線程生產(chǎn)者使用多線程發(fā)送數(shù)據(jù)到三到五個(gè)線程性能發(fā)送最佳,超過(guò)它也不能提高生產(chǎn)的發(fā)送速率。 RabbitMq 性能調(diào)優(yōu)筆記 [TOC] 避免雷區(qū) 要避免流控機(jī)制觸發(fā) 服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到40%,磁盤空閑空間小于50M,即啟動(dòng)內(nèi)存報(bào)警,磁...

    Tony 評(píng)論0 收藏0
  • 慕課網(wǎng)_《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié)

    摘要:慕課網(wǎng)消息中間件極速入門與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時(shí)間年月日星期三說(shuō)明本文部分內(nèi)容均來(lái)自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié) 時(shí)間:2018年09月05日星期三 說(shuō)明:本文部分內(nèi)容均來(lái)自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無(wú) 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評(píng)論0 收藏0
  • Android工程師轉(zhuǎn)型Java后端開發(fā)之路,自己選的路,跪著也要走下去!

    本文是公眾號(hào)讀者jianfeng投稿的面試經(jīng)驗(yàn)恭喜該同學(xué)成功轉(zhuǎn)型目錄:毅然轉(zhuǎn)型,沒頭蒼蠅制定目標(biāo),系統(tǒng)學(xué)習(xí)面試經(jīng)歷毅然轉(zhuǎn)崗,沒頭蒼蠅首先,介紹一下我的背景。本人坐標(biāo)廣州,2016年畢業(yè)于一個(gè)普通二本大學(xué),曾經(jīng)在某機(jī)構(gòu)培訓(xùn)過(guò)Android。2018年初的時(shí)候已經(jīng)在兩家小公司工作干了兩年的android開發(fā),然后會(huì)一些Tomcat、Servlet之類的技術(shù),當(dāng)時(shí)的年薪大概也就15萬(wàn)這樣子。由于個(gè)人發(fā)展...

    番茄西紅柿 評(píng)論0 收藏0
  • SpringBoot RabbitMQ 整合使用

    摘要:可以在地址看到如何使用講解下上面命令行表示控制臺(tái)端口號(hào),可以在瀏覽器中通過(guò)控制臺(tái)來(lái)執(zhí)行的相關(guān)操作。同時(shí)從控制臺(tái)可以看到發(fā)送的速率多線程測(cè)試性能開了個(gè)線程,每個(gè)線程發(fā)送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...

    yuanxin 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<