摘要:消息持久化控制的屬性就是消息的持久化。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),兩個(gè)消費(fèi)者都會(huì)收到消息并處理當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為時(shí),只有消費(fèi)者可以接收到消息。八的消息確認(rèn)機(jī)制在中,可以通過(guò)持久化數(shù)據(jù)解決服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題。
一、內(nèi)容大綱&使用場(chǎng)景 1. 消息隊(duì)列解決了什么問(wèn)題?
異步處理
應(yīng)用解耦
流量削鋒
日志處理
......
2. rabbitMQ安裝與配置 3. Java操作rabbitMQsimple 簡(jiǎn)單隊(duì)列
. work queues 工作隊(duì)列 公平分發(fā) 輪詢分發(fā)
. publish/subscribe 發(fā)布訂閱
. routing 路由選擇 通配符模式
. Topics 主題
手動(dòng)和自動(dòng)確認(rèn)消息
隊(duì)列的持久化和非持久化
rabbitMQ的延遲隊(duì)列
4. Spring AMQP Spring-Rabbit 5. DEMOMQ實(shí)現(xiàn)搜索引擎DIH增量
未支付訂單30分鐘 取消
類似百度統(tǒng)計(jì) cnzz 架構(gòu) 消息隊(duì)列
二、用戶及vhost配置 2.1 添加用戶 2.2 virtual hosts管理virtual hosts相當(dāng)于mysql的db
一般以/開頭
2.3 用戶授權(quán)需要對(duì)用戶進(jìn)行授權(quán)
三、簡(jiǎn)單隊(duì)列 3.1 模型P:消息生產(chǎn)者
紅色:隊(duì)列
C:消息消費(fèi)者
包含三個(gè)對(duì)象:生產(chǎn)者、隊(duì)列、消費(fèi)者
3.2 獲取mq連接import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { /** * 獲取MQ的連接 * @return */ public static Connection getConnection() throws IOException, TimeoutException { //定義一個(gè)連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); //AMQP的端口 factory.setPort(5672); //vhost factory.setVirtualHost("/vhost_mmr"); factory.setUsername("rabbit"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection; } }3.3 生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個(gè)通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello world!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); channel.close(); connection.close(); } }3.4 消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receive { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("msg receive : " + msg); } }; channel.basicConsume(QUEUE_NAME, consumer); } }3.5 簡(jiǎn)單隊(duì)列的不足
耦合性高,生產(chǎn)者一一對(duì)應(yīng)消費(fèi)者,如果需要多個(gè)消費(fèi)者消費(fèi)隊(duì)列中的消息,此時(shí)簡(jiǎn)單隊(duì)列就無(wú)能為力了。
隊(duì)列名變更,源碼需要同時(shí)變更
四、Work隊(duì)列 4.1 模型一個(gè)生產(chǎn)者將消息放入隊(duì)列中,可以有多個(gè)消費(fèi)者進(jìn)行消費(fèi)
為什么會(huì)出現(xiàn)工作隊(duì)列?
Simple隊(duì)列:是一一對(duì)應(yīng)的,實(shí)際開發(fā)中,生產(chǎn)者改善消息是毫不費(fèi)力的,而消費(fèi)者一般需要跟業(yè)務(wù)相結(jié)合,消費(fèi)者接收到消息之后就需要處理,可能需要花費(fèi)時(shí)間,此時(shí)隊(duì)列就會(huì)積壓很多消息。
4.2 輪詢分發(fā)生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個(gè)通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消費(fèi)者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費(fèi)者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }; boolean ack = true; channel.basicConsume(QUEUE_NAME, ack, consumer); } }
現(xiàn)象:
消費(fèi)者1和消費(fèi)者2處理的消息是一樣多的,這種分發(fā)方式稱為輪詢分發(fā)(round-robin),不管誰(shuí)忙或者誰(shuí)閑,都不會(huì)多給或者少給。任務(wù)均分。
4.3 公平分發(fā) fair dispatch保證一次發(fā)送給消費(fèi)者的消息不超過(guò)一條
/** * 每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息給消費(fèi)者,消費(fèi)者一次只處理一個(gè)消息 * * 限制發(fā)送給同一個(gè)消費(fèi)者不得超過(guò)一條消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount);
使用公平分發(fā),必須關(guān)閉自動(dòng)應(yīng)答ack,改為手動(dòng)
channel.basicAck(envelope.getDeliveryTag(), false); boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer);
生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //從連接中獲取一個(gè)通道 Channel channel = connection.createChannel(); //創(chuàng)建隊(duì)列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 每個(gè)消費(fèi)者發(fā)送確認(rèn)消息之前,消息隊(duì)列不發(fā)送下一個(gè)消息給消費(fèi)者,消費(fèi)者一次只處理一個(gè)消息 * * 限制發(fā)送給同一個(gè)消費(fèi)者不得超過(guò)一條消息 */ int preFetchCount = 1; channel.basicQos(preFetchCount); for (int i = 0; i < 50; i++) { String msg = "hello " + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("---send msg :" + msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); } }
消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); //創(chuàng)建channel Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv1 : " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }4.4 消息應(yīng)答與消息持久化 4.4.1 消息應(yīng)答
boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer);
ack = true時(shí)為自動(dòng)確認(rèn)模式,一旦rabbitMQ將消息分發(fā)給消費(fèi)者,該消息就會(huì)在內(nèi)存中刪除;這種情況下,如果殺死正在處理消息的消費(fèi)者,會(huì)丟失正在處理的消息;
ack = false時(shí)為手動(dòng)回執(zhí)(消息應(yīng)答)模式,如果有一個(gè)消費(fèi)者掛掉,就會(huì)將會(huì)給其他消費(fèi)者,rabbitMQ支持消息應(yīng)答,消費(fèi)者發(fā)送一個(gè)消息應(yīng)答,告訴rabbitMQ這個(gè)消息已經(jīng)被處理,然后rabbitMQ就刪除內(nèi)存中的消息;
消息應(yīng)答默認(rèn)打開,即為false;
由于消息在內(nèi)存中存儲(chǔ),如果rabbitMQ掛掉,消息仍然會(huì)丟失。
4.4.2 消息持久化boolean durable = false; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
durable控制的屬性就是消息的持久化。
已經(jīng)聲明好的隊(duì)列,如果durable已經(jīng)為false了,就無(wú)法修改為true,rabbitMQ不允許重新定義(不同參數(shù))一個(gè)已存在的隊(duì)列
五、訂閱模式 Publish/Subscribe 5.1 模型解讀:
1、一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者;
2、每個(gè)消費(fèi)者都有自己的隊(duì)列;
3、生產(chǎn)者沒有直接把消息發(fā)送到隊(duì)列,而是發(fā)送至交換機(jī)(eXchange)
4、每個(gè)隊(duì)列都要綁定到交換機(jī)上
5、生產(chǎn)者發(fā)送的消息,經(jīng)過(guò)交換機(jī),到達(dá)隊(duì)列,就能實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者消費(fèi)
5.2 實(shí)現(xiàn)生產(chǎn)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機(jī) channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "hello ps"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("Send " + msg); channel.close(); connection.close(); } }
消息哪去了?丟失了!因?yàn)榻粨Q機(jī)沒有存儲(chǔ)能力,在rabbitMQ中,只有隊(duì)列有存儲(chǔ)能力。此時(shí)并沒有完成隊(duì)列綁定到交換機(jī),所以數(shù)據(jù)丟失了。
消費(fèi)消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String QUEUE_NAME = "test_ps_fanout_email"; private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //綁定隊(duì)列到交換機(jī) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
不同的隊(duì)列做不同的事情。
5.3 Exchange(交換機(jī)、轉(zhuǎn)發(fā)器)一方面接收生產(chǎn)者的消息,另一方面向隊(duì)列推送消息
rabbitMQ提供了四種Exchange:fanout,direct,topic,header? header模式在實(shí)際使用中較少。
fanout:不處理路由鍵
direct:處理路由鍵
topic
將路由鍵和某模式進(jìn)行匹配
任何發(fā)送到Topic Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定話題的Queue上
六、路由模式 6.1 模型聲明exchange時(shí)指定為direct模式
綁定隊(duì)列時(shí),指定路由鍵
6.2 實(shí)現(xiàn)生產(chǎn)者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String msg = "hello direct"; //指定路由鍵 String routingKey = "warning"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消費(fèi)者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊(duì)列與交換機(jī)時(shí),指定路由鍵 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費(fèi)者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //綁定隊(duì)列與交換機(jī)時(shí),指定路由鍵 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }七、Topic模式 7.1 模型
# 匹配一個(gè)或多個(gè)
* 匹配一個(gè)
7.2 實(shí)現(xiàn)生產(chǎn)者
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明exchange,指定模式為topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String msg = "商品...."; String routingKey = "goods.delete"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send msg:" + msg); channel.close(); connection.close(); } }
消費(fèi)者1
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[1] msg recv1 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
消費(fèi)者2
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by wangbin on 2018/6/26. */ public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf8"); System.out.println("[2] msg recv2 : " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean ack = false;//自動(dòng)應(yīng)答改為false channel.basicConsume(QUEUE_NAME, ack, consumer); } }
其中,消費(fèi)者1綁定路由鍵為goods.#,消費(fèi)者2綁定路由鍵為goods.add。當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.add時(shí),兩個(gè)消費(fèi)者都會(huì)收到消息并處理;當(dāng)生產(chǎn)者發(fā)送的消息路由鍵為goods.update時(shí),只有消費(fèi)者1可以接收到消息。
八、RabbitMQ的消息確認(rèn)機(jī)制在rabbitMQ中,可以通過(guò)持久化數(shù)據(jù)解決rabbitMQ服務(wù)器異常的數(shù)據(jù)丟失問(wèn)題。
問(wèn)題:生產(chǎn)者將消息發(fā)送出去之后,消息到底有沒有到達(dá)rabbitMQ服務(wù)器;默認(rèn)情況是不知道消息已到達(dá)的
兩種方式:
AMQP實(shí)現(xiàn)了事務(wù)機(jī)制
confirm模式
8.1 事務(wù)機(jī)制txSelect
用于將當(dāng)前channel設(shè)置成transaction模式
txCommit
用于提交事務(wù)
txRollback
回滾事務(wù)
生產(chǎn)者發(fā)送消息
import com.meituan.mq.simple.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Send { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello tx msg!"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.txCommit(); } catch (IOException e) { channel.txRollback(); System.out.println("發(fā)生異常,事務(wù)已回滾"); } } }
事務(wù)機(jī)制會(huì)降低rabbitMQ的吞吐量。
8.2 Confirm模式生產(chǎn)者將信道設(shè)置成confirm模式,一旦信道進(jìn)入confirm模式,所有在該信道上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊(duì)列之后,broker就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫入磁盤之后發(fā)出,broker回傳給生產(chǎn)者的確認(rèn)消息中delivery-tag域包含了確認(rèn)消息的序列號(hào),此外broker也可以設(shè)置basic.ack的multiple域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理;
confirm模式最大的好處在于他是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方法來(lái)處理該確認(rèn)消息,如果RabbitMQ因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條nack消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息。
編程模式:
1、普通,發(fā)一條
2、批量,發(fā)一批
3、異步confirm模式,提供一個(gè)回調(diào)方法
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/71406.html
摘要:性能調(diào)優(yōu)筆記避免雷區(qū)要避免流控機(jī)制觸發(fā)服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到,磁盤空閑空間小于,即啟動(dòng)內(nèi)存報(bào)警,磁盤報(bào)警報(bào)警后服務(wù)端觸發(fā)流控機(jī)制。最佳線程生產(chǎn)者使用多線程發(fā)送數(shù)據(jù)到三到五個(gè)線程性能發(fā)送最佳,超過(guò)它也不能提高生產(chǎn)的發(fā)送速率。 RabbitMq 性能調(diào)優(yōu)筆記 [TOC] 避免雷區(qū) 要避免流控機(jī)制觸發(fā) 服務(wù)端默認(rèn)配置是當(dāng)內(nèi)存使用達(dá)到40%,磁盤空閑空間小于50M,即啟動(dòng)內(nèi)存報(bào)警,磁...
摘要:慕課網(wǎng)消息中間件極速入門與實(shí)戰(zhàn)學(xué)習(xí)總結(jié)時(shí)間年月日星期三說(shuō)明本文部分內(nèi)容均來(lái)自慕課網(wǎng)。 慕課網(wǎng)《RabbitMQ消息中間件極速入門與實(shí)戰(zhàn)》學(xué)習(xí)總結(jié) 時(shí)間:2018年09月05日星期三 說(shuō)明:本文部分內(nèi)容均來(lái)自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:無(wú) 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:RabbitM...
本文是公眾號(hào)讀者jianfeng投稿的面試經(jīng)驗(yàn)恭喜該同學(xué)成功轉(zhuǎn)型目錄:毅然轉(zhuǎn)型,沒頭蒼蠅制定目標(biāo),系統(tǒng)學(xué)習(xí)面試經(jīng)歷毅然轉(zhuǎn)崗,沒頭蒼蠅首先,介紹一下我的背景。本人坐標(biāo)廣州,2016年畢業(yè)于一個(gè)普通二本大學(xué),曾經(jīng)在某機(jī)構(gòu)培訓(xùn)過(guò)Android。2018年初的時(shí)候已經(jīng)在兩家小公司工作干了兩年的android開發(fā),然后會(huì)一些Tomcat、Servlet之類的技術(shù),當(dāng)時(shí)的年薪大概也就15萬(wàn)這樣子。由于個(gè)人發(fā)展...
摘要:可以在地址看到如何使用講解下上面命令行表示控制臺(tái)端口號(hào),可以在瀏覽器中通過(guò)控制臺(tái)來(lái)執(zhí)行的相關(guān)操作。同時(shí)從控制臺(tái)可以看到發(fā)送的速率多線程測(cè)試性能開了個(gè)線程,每個(gè)線程發(fā)送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...
閱讀 2748·2021-10-22 09:55
閱讀 2149·2021-09-27 13:35
閱讀 1335·2021-08-24 10:02
閱讀 1633·2019-08-30 15:55
閱讀 1273·2019-08-30 14:13
閱讀 3534·2019-08-30 13:57
閱讀 2047·2019-08-30 11:07
閱讀 2534·2019-08-29 17:12