RabbitMQ

# Rabbitmq 了解 ## 是什么 ![image.png](https://cos.easydoc.net/31477061/files/kqc3tpsc.png) ## 配置 ![image.png](https://cos.easydoc.net/31477061/files/kqc3vcvq.png) ## 常用端口 ![image.png](https://cos.easydoc.net/31477061/files/kqc3xpjd.png) ![image.png](https://cos.easydoc.net/31477061/files/kqc3yabg.png) ![image.png](https://cos.easydoc.net/31477061/files/kqc3zbw4.png) ![image.png](https://cos.easydoc.net/31477061/files/kqc3zhg2.png) ## 管理界面 ![image.png](https://cos.easydoc.net/31477061/files/kqc4528i.png) ![image.png](https://cos.easydoc.net/31477061/files/kqc42ait.png) ### 权限种类 none management policymaker monitoring administrator ![image.png](https://cos.easydoc.net/31477061/files/kqc46bne.png) ![image.png](https://cos.easydoc.net/31477061/files/kqc48pwl.png) ![image.png](https://cos.easydoc.net/31477061/files/kqc49m5c.png) # java使用 ## 添加依赖 ![image.png](https://cos.easydoc.net/31477061/files/kqc4e7kg.png) ![image.png](https://cos.easydoc.net/31477061/files/kqc4eged.png) ## 生产者代码 ``` package com.study.rabbitmq.a132.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 简单队列生产者 * 使用RabbitMQ的默认交换器发送消息 */ public class Producer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("192.168.100.242"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3、从连接工厂获取连接 connection = factory.newConnection("生产者"); // 4、从链接中创建通道 channel = connection.createChannel(); /** * 5、声明(创建)队列 * 如果队列不存在,才会创建 * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 * * queueDeclare参数说明: * @param queue 队列名称 * @param durable 队列是否持久化 * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制 * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除 * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等 */ channel.queueDeclare("queue1", false, false, false, null); // 消息内容 String message = "Hello World!"; // 6、发送消息 将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 RoutingKey 与队列名称相同。 channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息已发送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 7、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } ``` ## 生产者代码2 ``` package com.study.rabbitmq.a132.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Topic--生产者 * * 生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符 */ public class Producer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("192.168.100.242"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3、从连接工厂获取连接 connection = factory.newConnection("生产者"); // 4、从链接中创建通道 channel = connection.createChannel(); // 路由关系如下:com.# --> queue-1 *.order.* ---> queue-2 // 消息内容 String message = "Hello A"; // 发送消息到topic_test交换器上 channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes()); System.out.println("消息 " + message + " 已发送!"); // 消息内容 message = "Hello B"; // 发送消息到topic_test交换器上 channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes()); System.out.println("消息 " + message + " 已发送!"); // 消息内容 message = "Hello C"; // 发送消息到topic_test交换器上 channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes()); System.out.println("消息 " + message + " 已发送!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 7、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } ``` ## 消费者代码 ``` package com.study.rabbitmq.a132.simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 简单队列消费者 */ public class Consumer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("192.168.100.242"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3、从连接工厂获取连接 connection = factory.newConnection("消费者"); // 4、从链接中创建通道 channel = connection.createChannel(); /** * 5、声明(创建)队列 * 如果队列不存在,才会创建 * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错 * * queueDeclare参数说明: * @param queue 队列名称 * @param durable 队列是否持久化 * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问, * 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。 * 一般在队列和交换器绑定时使用 * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除 * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等 */ channel.queueDeclare("queue1", false, false, false, null); // 6、定义收到消息后的回调 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 7、监听队列 channel.basicConsume("queue1", true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println("开始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 8、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 9、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } ``` ## 消费者代码2 ``` package com.study.rabbitmq.a132.routing; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 路由--消费者 * * 消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息 */ public class Consumer { private static Runnable receive = new Runnable() { public void run() { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("192.168.100.242"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; final String queueName = Thread.currentThread().getName(); try { // 3、从连接工厂获取连接 connection = factory.newConnection("消费者"); // 4、从链接中创建通道 channel = connection.createChannel(); // 定义消息接收回调对象 DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8")); } }; // 监听队列 channel.basicConsume(queueName, true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } }); System.out.println(queueName + " 开始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 8、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 9、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(receive, "queue-1").start(); new Thread(receive, "queue-2").start(); new Thread(receive, "queue-3").start(); } } ``` ## 消费者 添加一次接收消息数量 ![image.png](https://cos.easydoc.net/31477061/files/ks5hdnq4.png) ## 代码定义交换器 及绑定 ![image.png](https://cos.easydoc.net/31477061/files/ks5i4spj.png) ## 创建一个exchange 创建一个queue,和exchange绑定起来 ![image.png](https://cos.easydoc.net/31477061/files/ks5nfw2t.png) ## rabbit 图解 ![image.png](https://cos.easydoc.net/31477061/files/kqc8o0pe.png) ## java代码 访问交换机