MQ可靠生产

JAVA ## 生产者 ``` package com.study.rabbitmq.a132.confirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.TimeoutException; // 可靠生产 // https://www.rabbitmq.com/confirms.html public class Producer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("192.168.100.242"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; try { // 3、从连接工厂获取连接 connection = factory.newConnection("生产者"); // 4、从链接中创建通道 channel = connection.createChannel(); // 进入confirm模式, 每次发送消息,rabbtiqm处理之后会返回一个对应的回执消息 AMQP.Confirm.SelectOk selectOk = channel.confirmSelect(); // 增加监听器 ArrayList<String> queues = new ArrayList<>(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { // deliveryTag 同一个channel中此条消息的编号 。 // 业务.. System.out.println("受理成功 " + queues.get((int) deliveryTag) + " " + multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { // 失败重发 // queues.get((int) deliveryTag) System.out.println("受理失败 " + deliveryTag); } }); // 定义fanout类型的交换器 channel.exchangeDeclare("ps_test", "fanout"); for (int i = 0; i < 10; i++) { // 消息内容 String message = "Hello Confirm " + i; queues.add(message); // 发送消息到ps_test交换器上 AMQP.BasicProperties basicProperties = new AMQP.BasicProperties(); channel.basicPublish("ps_test", "", basicProperties, message.getBytes()); System.out.println("消息 " + message + " 已发送!"); } // 等待20秒 Thread.sleep(20 * 1000L); } catch (Exception e) { e.printStackTrace(); } finally { // 7、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 8、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } } ``` ## 消费者 ``` package com.study.rabbitmq.a132.confirm; import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; /** * 消息确认机制 */ public class Consumer { private static Runnable receive = new Runnable() { public void run() { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置连接属性 factory.setHost("192.168.100.242"); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; Channel channel = null; final String clientName = Thread.currentThread().getName(); try { // 3、从连接工厂获取连接 connection = factory.newConnection("消费者"); // ###死信队列相关:专门用来存储 出错 出异常的数据 channel = connection.createChannel(); // 1、 创建一个exchange channel.exchangeDeclare("dlq_exchange", "fanout"); // 2、 创建一个queue,和exchange绑定起来 channel.queueDeclare("dlq_queue1", false, false, false, null); channel.queueBind("dlq_queue1", "dlq_exchange", ""); // ######死信队列结束 // 4、从链接中创建通道 channel = connection.createChannel(); // 代码定义交换器 channel.exchangeDeclare("ps_test", "fanout"); // 还可以定义一个临时队列,连接关闭后会自动删除,此队列是一个排他队列 String queueName = "queue1"; // 队列中有死信产生时,消息会转发到交换器 dlq_exchange。 Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "dlq_exchange"); channel.queueDeclare(queueName, false, false, false, args); // 将队列和交换器绑定 channel.queueBind(queueName, "ps_test", ""); // 监听队列 Channel finalChannel = channel; channel.basicConsume(queueName, false, "消费者-手动回执", new DefaultConsumer(finalChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("收到消息: " + new String(body)); // TODO 业务处理 long deliveryTag = envelope.getDeliveryTag(); // 模拟业务处理耗时 Thread.sleep(1000L); // 正常消费 // finalChannel.basicAck(deliveryTag, false); // 异常消费 finalChannel.basicNack(envelope.getDeliveryTag(), false, false); } catch (InterruptedException e) { // 异常消费, requeue参数 true重发,false不重发(丢弃或者移到DLQ死信队列) // finalChannel.basicNack(envelope.getDeliveryTag(), false, false); e.printStackTrace(); } } }); System.out.println(clientName + " 开始接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 8、关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 9、关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(receive, "c1").start(); } } ```